Re: Package Release Annoucement: Spark SQL on HBase Astro
That's awesome Yan. I was considering Phoenix for SQL calls to HBase since Cassandra supports CQL but HBase QL support was lacking. I will get back to you as I start using it on our loads. I am assuming the latencies won't be much different from accessing HBase through tsdb asynchbase as that's one more option I am looking into. On Mon, Jul 27, 2015 at 10:12 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: HBase in this case is no different from any other Spark SQL data sources, so yes you should be able to access HBase data through Astro from Spark SQL’s JDBC interface. Graphically, the access path is as follows: Spark SQL JDBC Interface - Spark SQL Parser/Analyzer/Optimizer-Astro Optimizer- HBase Scans/Gets - … - HBase Region server Regards, Yan *From:* Debasish Das [mailto:debasish.da...@gmail.com] *Sent:* Monday, July 27, 2015 10:02 PM *To:* Yan Zhou.sc *Cc:* Bing Xiao (Bing); dev; user *Subject:* RE: Package Release Annoucement: Spark SQL on HBase Astro Hi Yan, Is it possible to access the hbase table through spark sql jdbc layer ? Thanks. Deb On Jul 22, 2015 9:03 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Yes, but not all SQL-standard insert variants . *From:* Debasish Das [mailto:debasish.da...@gmail.com] *Sent:* Wednesday, July 22, 2015 7:36 PM *To:* Bing Xiao (Bing) *Cc:* user; dev; Yan Zhou.sc *Subject:* Re: Package Release Annoucement: Spark SQL on HBase Astro Does it also support insert operations ? On Jul 22, 2015 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
A question about spark checkpoint
Hi, I have following code that uses checkpoint to checkpoint the heavy ops,which works well that the last heavyOpRDD.foreach(println) will not recompute from the beginning. But when I re-run this program, the rdd computing chain will be recomputed from the beginning, I thought that it will also read from the checkpoint directory since I have the data there when I last run it. Do I misunderstand how checkpoint works or there are some configuration to make it work. Thanks import org.apache.spark.{SparkConf, SparkContext} object CheckpointTest { def squareWithHeavyOp(x: Int) = { Thread.sleep(2000) println(ssquareWithHeavyOp $x) x * x } def main(args: Array[String]) { val conf = new SparkConf().setMaster(local).setAppName(CheckpointTest) val sc = new SparkContext(conf) sc.setCheckpointDir(file:///d:/checkpointDir) val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) val heavyOpRDD = rdd.map(squareWithHeavyOp) heavyOpRDD.checkpoint() heavyOpRDD.foreach(println) println(Job 0 has been finished, press ENTER to do job 1) readLine() heavyOpRDD.foreach(println) } } bit1...@163.com
pyspark/py4j tree error
I am using pyspark and I want to test the sql function. I get this Java tree error. Any ideas. iwaggDF.registerTempTable('iwagg') hierDF.registerTempTable('hier') res3=sqlc.sql('select name, sum(amount) as amount from iwagg a left join hier b on a.segm=b.segm group by name order by sum(amount) desc').collect() File /home/dirk/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py, line 281, in collect port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) File /home/dirk/spark-1.4.1-bin-hadoop2.6/python/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/dirk/spark-1.4.1-bin-hadoop2.6/python/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o75.javaToPython. : org.apache.spark.sql.catalyst.errors.package$*TreeNodeException*: sort, tree: Sort [SUM(amount#326) DESC], true Exchange (RangePartitioning 200) Aggregate false, [name#6], [name#6,CombineSum(PartialSum#328) AS amount#326] Exchange (HashPartitioning 200) Aggregate true, [name#6], [name#6,SUM(CAST(amount#3, DoubleType)) AS PartialSum#328] Project [name#6,amount#3] HashOuterJoin [segm#1], [segm#5], LeftOuter, None Exchange (HashPartitioning 200) Project [amount#3,segm#1] PhysicalRDD [cust#0,segm#1,wsd#2,amount#3,trips#4], MapPartitionsRDD[7] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 Exchange (HashPartitioning 200) PhysicalRDD [segm#5,name#6], MapPartitionsRDD[15] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2 Dirk
java.io.IOException: failure to login
Hi, I’ve posted this question to stackoverflow.com here http://stackoverflow.com/questions/31534458/failing-integration-test-for-apache-spark-streaming but it’s not getting any responses. I've been trying to track down an issue with some unit/integration tests I've been writing for a Spark Streaming project. When using Spark 1.1.1 my test passed. When I tried to upgrade to 1.4.0 (also tried 1.4.1) the test starts failing. I've managed to reduce the code needed to reproduce the issue down to the small integration test below. Interestingly, if I comment out the @RunWith annotation on the test then the test passes correctly. Obviously I don't need the @RunWith annotation for this cut down test, but the real tests make use of mocks fairly extensively, so I'd rather not have to drop using PowerMock. package com.example; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) public class SampleTest { @Before public void setup() throws Exception { SparkConf conf = new SparkConf(false).setMaster(local[2]).setAppName(My app); JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000)); } @Test public void exampleTest() { } } Below is the exception I'm seeing java.io.IOException: failure to login at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:796) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.init(SparkContext.scala:301) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:842) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:80) at org.apache.spark.streaming.api.java.JavaStreamingContext.init(JavaStreamingContext.scala:133) at com.example.SampleTest.setup(SampleTest.java:19) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.internal.runners.MethodRoadie.runBefores(MethodRoadie.java:133) at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282) at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87) at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122) at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106) at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53) at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59) at
Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
That happens when you batch duration is less than your processing time, you need to set StorageLevel to MEMORY_AND_DISK, if you are using the latest version of spark and you are just exploring things, then you can go with the kafka consumers that comes with Spark itself. You will not have this issue with KafkaUtils.directStream since it is not a receiver based consumer. Thanks Best Regards On Tue, Jul 28, 2015 at 2:36 PM, Manohar Reddy manohar.re...@happiestminds.com wrote: Thanks Akhil.that solved now but below is the new stack trace. Don’t feel bad, am look into that but if it is there in your fingers please *15/07/28 09:03:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 77, ip-10-252-7-70.us-west-2.compute.internal): java.lang.Exception: Could not compute split, block input-0-1438074176218 not found* at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, July 28, 2015 2:30 PM *To:* Manohar Reddy *Cc:* user@spark.apache.org *Subject:* Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client You need to trigger an action on your rowrdd for it to execute the map, you can do a rowrdd.count() for that. Thanks Best Regards On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy manohar.re...@happiestminds.com wrote: Hi Akhil, Thanks for thereply.I found the root cause but don’t know how to solve this. Below is the cause.this map function not going inside to execute because of this all my list fields are empty. Please let me know what might be the cause to not execute this snippet of code*.the below map is not execution not going inside.* JavaRDDRow rowrdd=*rdd**.map(**new** FunctionMessageAndMetadata, Row() {* *@Override* *public** Row call(MessageAndMetadata **arg0**) **throws** Exception {* * System.**out**.println(**inside thread map ca**);* *String[] **data**=**new** String(**arg0* *.getPayload()).split(**\\|**);* *int* *i**=0;* *for** (String* string : data) { *if*(i3){ *if*(i%2==0){ fields.add(DataTypes. *createStructField*(string, DataTypes.*StringType*, *true*)); System.*out*.println(string); }*else*{ listvalues.add(string); System.*out*.println(string); } *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, July 28, 2015 1:52 PM *To:* Manohar Reddy *Cc:* user@spark.apache.org *Subject:* Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4. Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet.
RE: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
Yaa got it Thanks Akhil. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 2:47 PM To: Manohar Reddy Cc: user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client That happens when you batch duration is less than your processing time, you need to set StorageLevel to MEMORY_AND_DISK, if you are using the latest version of spark and you are just exploring things, then you can go with the kafka consumers that comes with Spark itself. You will not have this issue with KafkaUtils.directStream since it is not a receiver based consumer. Thanks Best Regards On Tue, Jul 28, 2015 at 2:36 PM, Manohar Reddy manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Thanks Akhil.that solved now but below is the new stack trace. Don’t feel bad, am look into that but if it is there in your fingers please 15/07/28 09:03:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 77, ip-10-252-7-70.us-west-2.compute.internal): java.lang.Exception: Could not compute split, block input-0-1438074176218 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 2:30 PM To: Manohar Reddy Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client You need to trigger an action on your rowrdd for it to execute the map, you can do a rowrdd.count() for that. Thanks Best Regards On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Hi Akhil, Thanks for thereply.I found the root cause but don’t know how to solve this. Below is the cause.this map function not going inside to execute because of this all my list fields are empty. Please let me know what might be the cause to not execute this snippet of code.the below map is not execution not going inside. JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { System.out.println(inside thread map ca); String[] data=new String(arg0.getPayload()).split(\\|); int i=0; for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); System.out.println(string); }else{ listvalues.add(string); System.out.println(string); } From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 1:52 PM To: Manohar Reddy Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am
Re: Data from PostgreSQL to Spark
Hi Ayan Thanks for reply. Its around 5 GB having 10 tables...this data changes very frequently every minutes few updates its difficult to have this data in spark, if any updates happen on main tables, how can I refresh spark data? On 28 July 2015 at 02:11, ayan guha guha.a...@gmail.com wrote: You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: use S3-Compatible Storage with spark
Hi recompiled and retried, now its looking like this with s3a : com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain S3n is working find, (only problem is still the endpoint) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL ArrayOutofBoundsException Question
Hello all, I am currently having an error with Spark SQL access Elasticsearch using Elasticsearch Spark integration. Below is the series of command I issued along with the stacktrace. I am unclear what the error could mean. I can print the schema correctly but error out if i try and display a few results. Can you guys point me in the right direction? scala sqlContext.read.format(org.elasticsearch.spark.sql).options(esOptions).load(reddit_comment_public-201507-v3/default).registerTempTable(reddit_comment) scala reddit_comment_df.printSchema root |-- data: struct (nullable = true) ||-- archived: boolean (nullable = true) ||-- author: string (nullable = true) ||-- author_flair_css_class: string (nullable = true) ||-- author_flair_text: string (nullable = true) ||-- body: string (nullable = true) ||-- body_html: string (nullable = true) ||-- controversiality: long (nullable = true) ||-- created: long (nullable = true) ||-- created_utc: long (nullable = true) ||-- distinguished: string (nullable = true) ||-- downs: long (nullable = true) ||-- edited: long (nullable = true) ||-- gilded: long (nullable = true) ||-- id: string (nullable = true) ||-- link_author: string (nullable = true) ||-- link_id: string (nullable = true) ||-- link_title: string (nullable = true) ||-- link_url: string (nullable = true) ||-- name: string (nullable = true) ||-- parent_id: string (nullable = true) ||-- replies: string (nullable = true) ||-- saved: boolean (nullable = true) ||-- score: long (nullable = true) ||-- score_hidden: boolean (nullable = true) ||-- subreddit: string (nullable = true) ||-- subreddit_id: string (nullable = true) ||-- ups: long (nullable = true) scala reddit_comment_df.show 15/07/27 20:38:31 INFO ScalaEsRowRDD: Reading from [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO ScalaEsRowRDD: Discovered mapping {reddit_comment_public-201507-v3=[mappings=[default=[acquire_date=DATE, elasticsearch_date_partition_index=STRING, elasticsearch_language_partition_index=STRING, elasticsearch_type=STRING, source=[data=[archived=BOOLEAN, author=STRING, author_flair_css_class=STRING, author_flair_text=STRING, body=STRING, body_html=STRING, controversiality=LONG, created=LONG, created_utc=LONG, distinguished=STRING, downs=LONG, edited=LONG, gilded=LONG, id=STRING, link_author=STRING, link_id=STRING, link_title=STRING, link_url=STRING, name=STRING, parent_id=STRING, replies=STRING, saved=BOOLEAN, score=LONG, score_hidden=BOOLEAN, subreddit=STRING, subreddit_id=STRING, ups=LONG], kind=STRING], source_geo_location=GEO_POINT, source_id=STRING, source_language=STRING, source_time=DATE]]]} for [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO SparkContext: Starting job: show at console:26 15/07/27 20:38:31 INFO DAGScheduler: Got job 13 (show at console:26) with 1 output partitions (allowLocal=false) 15/07/27 20:38:31 INFO DAGScheduler: Final stage: ResultStage 16(show at console:26) 15/07/27 20:38:31 INFO DAGScheduler: Parents of final stage: List() 15/07/27 20:38:31 INFO DAGScheduler: Missing parents: List() 15/07/27 20:38:31 INFO DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[65] at show at console:26), which has no missing parents 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(7520) called with curMem=71364, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 7.3 KB, free 2.6 GB) 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(3804) called with curMem=78884, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 3.7 KB, free 2.6 GB) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.239:58296 (size: 3.7 KB, free: 2.6 GB) 15/07/27 20:38:31 INFO SparkContext: Created broadcast 13 from broadcast at DAGScheduler.scala:874 15/07/27 20:38:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (MapPartitionsRDD[65] at show at console:26) 15/07/27 20:38:31 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks 15/07/27 20:38:31 INFO FairSchedulableBuilder: Added task set TaskSet_16 tasks to pool default 15/07/27 20:38:31 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 172, 172.25.185.164, ANY, 5085 bytes) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.164:50275 (size: 3.7 KB, free: 3.6 GB) 15/07/27 20:38:31 WARN TaskSetManager: Lost task 0.0 in stage 16.0 (TID 172, 172.25.185.164): java.lang.ArrayIndexOutOfBoundsException: -1 at scala.collection.mutable.ResizableArray$class.update(ResizableArray.scala:49) at scala.collection.mutable.ArrayBuffer.update(ArrayBuffer.scala:47) at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:29) at
Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4. Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet. JavaDStreamMessageAndMetadata unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); unionStreams .foreachRDD(new Function2JavaRDDlt;MessageAndMetadata, Time, Void() { @Override public Void call(JavaRDDMessageAndMetadata rdd, Time time) throws Exception { new ThreadParam(rdd).start(); return null; } }); # public ThreadParam(JavaRDDMessageAndMetadata rdd) { this.rdd = rdd; // this.context=context; } public void run(){ final ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); final ListString meta=new ArrayList(); JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { String[] data=new String(arg0.getPayload()).split(\\|); int i=0; ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); ListString meta=new ArrayList(); for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); // System.out.println(splitarr[i]); }else{ listvalues.add(string); // System.out.println(splitarr[i]); } }else{ meta.add(string); } i++; }int size=listvalues.size(); return RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23), listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20), listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17), listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14), listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11), listvalues.get(25-10),listvalues.get(25-9),listvalues.get(25-8), listvalues.get(25-7),listvalues.get(25-6),listvalues.get(25-5), listvalues.get(25-4),listvalues.get(25-3),listvalues.get(25-2),listvalues.get(25-1)); } }); SQLContext sqlContext = new SQLContext(rowrdd.context()); StructType schema = DataTypes.createStructType(fields); System.out.println(before creating schema); DataFrame courseDf=sqlContext.createDataFrame(rowrdd, schema); courseDf.registerTempTable(course); courseDf.show(); System.out.println(after creating schema); BELOW IS THE COMMAND TO RUN THIS AND XENT FOR THAT IS THE STACKTRACE eRROR MASTER=yarn-client /home/hadoop/spark/bin/spark-submit --class com.person.Consumer /mnt1/manohar/spark-load-from-db/targetpark-load-from-db-1.0-SNAPSHOT-jar-with-dependencies.jar ERROR IS AS 15/07/27 14:45:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, ip-10-252-7-73.us-west-2.compute.internal): java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
RE: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
Hi Akhil, Thanks for thereply.I found the root cause but don’t know how to solve this. Below is the cause.this map function not going inside to execute because of this all my list fields are empty. Please let me know what might be the cause to not execute this snippet of code.the below map is not execution not going inside. JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { System.out.println(inside thread map ca); String[] data=new String(arg0.getPayload()).split(\\|); int i=0; for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); System.out.println(string); }else{ listvalues.add(string); System.out.println(string); } From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 1:52 PM To: Manohar Reddy Cc: user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4. Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet. JavaDStreamMessageAndMetadata unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); unionStreams .foreachRDD(new Function2JavaRDDlt;MessageAndMetadata, Time, Void() { @Override public Void call(JavaRDDMessageAndMetadata rdd, Time time) throws Exception { new ThreadParam(rdd).start(); return null; } }); # public ThreadParam(JavaRDDMessageAndMetadata rdd) { this.rdd = rdd; // this.context=context; } public void run(){ final ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); final ListString meta=new ArrayList(); JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { String[] data=new String(arg0.getPayload()).split(\\|); int i=0; ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); ListString meta=new ArrayList(); for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); // System.out.println(splitarr[i]); }else{ listvalues.add(string); // System.out.println(splitarr[i]); } }else{ meta.add(string); } i++; }int size=listvalues.size(); return RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23), listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20), listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17), listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14), listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11),
Re: Data from PostgreSQL to Spark
I trying do that, but there will always data mismatch, since by the time scoop is fetching main database will get so many updates. There is something called incremental data fetch using scoop but that hits a database rather than reading the WAL edit. On 28 July 2015 at 02:52, santosh...@gmail.com wrote: Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail *From:* ayan guha guha.a...@gmail.com *Sent:* Monday, July 27, 2015 4:41 PM *To:* Jeetendra Gangele gangele...@gmail.com *Cc:* felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: ReceiverStream SPARK not able to cope up with 20,000 events /sec .
You need to find the bottleneck here, it could your network (if the data is huge) or your producer code isn't pushing at 20k/s, If you are able to produce at 20k/s then make sure you are able to receive at that rate (try it without spark). Thanks Best Regards On Sat, Jul 25, 2015 at 3:29 PM, anshu shukla anshushuk...@gmail.com wrote: My eventGen is emitting 20,000 events/sec ,and I am using store(s1) in receive() method to push data to receiverStream . But this logic is working fine for upto 4000 events/sec and no batch are seen emitting for larger rate . *CODE:TOPOLOGY -* *JavaDStreamString sourcestream = ssc.receiverStream(new TetcCustomEventReceiver(datafilename,spoutlog,argumentClass.getScalingFactor(),datasetType));* *CODE:TetcCustomEventReceiver -* public void receive(ListString event) { StringBuffer tuple=new StringBuffer(); msgId++; for(String s:event) { tuple.append(s).append(,); } String s1=MsgIdAddandRemove.addMessageId(tuple.toString(),msgId); store(s1); } -- Thanks Regards, Anshu Shukla
Spark-Cassandra connector DataFrame
Hi, I would like to get the recommendations to use Spark-Cassandra connector DataFrame feature. I was trying to save a Dataframe containing 8 Million rows to Cassandra through the Spark-Cassandra connector. Based on the Spark log, this single action took about 60 minutes to complete. I think it was a very slow process. Are there some configurations I need to check when using this Spark-Cassandra connector DataFrame feature? From the Spark log, I can see saving the Dataframe to Cassandra was performed by 200 small steps. Cassandra database was connected and disconnected 4 times during the 60 minutes. This number matches the number of nodes the Cassandra cluster has. I understand this feature is Dataframe Experimental, and I am new to both Spark and Cassandra. Any suggestions are much appreciated. Thanks, Simon Wang
Re: Spark on Mesos - Shut down failed while running spark-shell
Hi Haripriya, Your master has registered it's public ip to be 127.0.0.1:5050 which won't be able to be reached by the slave node. If mesos didn't pick up the right ip you can specifiy one yourself via the --ip flag. Tim On Mon, Jul 27, 2015 at 8:32 PM, Haripriya Ayyalasomayajula aharipriy...@gmail.com wrote: Hi all, I am running Spark 1.4.1 on mesos 0.23.0 While I am able to start spark-shell on the node with mesos-master running, it works fine. But when I try to start spark-shell on mesos-slave nodes, I'm encounter this error. I greatly appreciate any help. 15/07/27 22:14:44 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/27 22:14:44 INFO SparkUI: Started SparkUI at http://10.142.0.140:4040 Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY. Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY. WARNING: Logging before InitGoogleLogging() is written to STDERR W0727 22:14:45.091286 33441 sched.cpp:1326] ** Scheduler driver bound to loopback interface! Cannot communicate with remote master(s). You might want to set 'LIBPROCESS_IP' environment variable to use a routable IP address. ** 2015-07-27 22:14:45,091:33222(0x7fff9e1fc700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2015-07-27 22:14:45,091:33222(0x7fff9e1fc700):ZOO_INFO@log_env@716: Client environment:host.name=nid00011 I0727 22:14:45.091995 33441 sched.cpp:157] Version: 0.23.0 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@724: Client environment:os.arch=2.6.32-431.el6_1..8785-cray_ari_athena_c_cos 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Jun 24 19:34:50 UTC 2015 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@733: Client environment:user.name=root 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@741: Client environment:user.home=/root 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@753: Client environment:user.dir=/opt/spark-1.4.1/spark-source 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=192.168.0.10:2181 sessionTimeout=1 watcher=0x7fffb561a8e0 sessionId=0 sessionPasswd=nullcontext=0x7ffdd930 flags=0 2015-07-27 22:14:45,092:33222(0x7fff6ebfd700):ZOO_INFO@check_events@1703: initiated connection to server [192.168.0.10:2181] 2015-07-27 22:14:45,096:33222(0x7fff6ebfd700):ZOO_INFO@check_events@1750: session establishment complete on server [192.168.0.10:2181], sessionId=0x14ed296a0fd000a, negotiated timeout=1 I0727 22:14:45.096891 33479 group.cpp:313] Group process (group(1)@ 127.0.0.1:45546) connected to ZooKeeper I0727 22:14:45.096914 33479 group.cpp:787] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) I0727 22:14:45.096923 33479 group.cpp:385] Trying to create path '/mesos' in ZooKeeper I0727 22:14:45.099181 33471 detector.cpp:138] Detected a new leader: (id='4') I0727 22:14:45.099298 33483 group.cpp:656] Trying to get '/mesos/info_04' in ZooKeeper W0727 22:14:45.100443 33453 detector.cpp:444] Leading master master@127.0.0.1:5050 is using a Protobuf binary format when registering with ZooKeeper (info): this will be deprecated as of Mesos 0.24 (see MESOS-2340) I0727 22:14:45.100544 33453 detector.cpp:481] A new leading master (UPID= master@127.0.0.1:5050) is detected I0727 22:14:45.100739 33478 sched.cpp:254] New master detected at master@127.0.0.1:5050 I0727 22:14:45.101104 33478 sched.cpp:264] No credentials provided. Attempting to register without authentication E0727 22:14:45.101210 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] E0727 22:14:45.101380 33490 socket.hpp:107] Shutdown failed on fd=89: Transport endpoint is not connected [107] E0727 22:14:46.643348 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] E0727 22:14:47.111336 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] 15/07/27 22:14:50 INFO DiskBlockManager: Shutdown hook called 15/07/27 22:14:50 INFO Utils: path = /tmp/spark-3f94442b-7873-463f-91dd-3ee62ed5b263/blockmgr-74a5ed25-025b-4186-b1d8-dc395f287a8f, already present as root for deletion. 15/07/27 22:14:50 INFO Utils: Shutdown hook called 15/07/27 22:14:50 INFO Utils: Deleting directory /tmp/spark-3f94442b-7873-463f-91dd-3ee62ed5b263/httpd-5d2a71e5-1d36-47f7-b122-31f1dd12a0f0
Spark Number of Partitions Recommendations
Hi All, I was wondering why the recommended number for parallelism was 2 -3 times the number of cores on your cluster. Is the heuristic explained in any of the Spark papers? Or is it more of an agreed upon rule of thumb? Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From: Sean Owen so...@cloudera.com To: Proust GZ Feng/China/IBM@IBMCN Cc: user user@spark.apache.org Date: 07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
Thanks Akhil.that solved now but below is the new stack trace. Don’t feel bad, am look into that but if it is there in your fingers please 15/07/28 09:03:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 77, ip-10-252-7-70.us-west-2.compute.internal): java.lang.Exception: Could not compute split, block input-0-1438074176218 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 2:30 PM To: Manohar Reddy Cc: user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client You need to trigger an action on your rowrdd for it to execute the map, you can do a rowrdd.count() for that. Thanks Best Regards On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Hi Akhil, Thanks for thereply.I found the root cause but don’t know how to solve this. Below is the cause.this map function not going inside to execute because of this all my list fields are empty. Please let me know what might be the cause to not execute this snippet of code.the below map is not execution not going inside. JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { System.out.println(inside thread map ca); String[] data=new String(arg0.getPayload()).split(\\|); int i=0; for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); System.out.println(string); }else{ listvalues.add(string); System.out.println(string); } From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, July 28, 2015 1:52 PM To: Manohar Reddy Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.commailto:manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4. Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet. JavaDStreamMessageAndMetadata unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); unionStreams .foreachRDD(new Function2JavaRDDlt;MessageAndMetadata, Time, Void() { @Override public Void call(JavaRDDMessageAndMetadata rdd, Time time) throws Exception { new ThreadParam(rdd).start();
Re: use S3-Compatible Storage with spark
With s3n try this out: *s3service.s3-endpoint*The host name of the S3 service. You should only ever change this value from the default if you need to contact an alternative S3 endpoint for testing purposes. Default: s3.amazonaws.com Thanks Best Regards On Tue, Jul 28, 2015 at 1:54 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi recompiled and retried, now its looking like this with s3a : com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain S3n is working find, (only problem is still the endpoint)
Re: Question abt serialization
Did you try it with just: (comment out line 27) println Count of spark: + file.filter({s - s.contains('spark')}).count() Thanks Best Regards On Sun, Jul 26, 2015 at 12:43 AM, tog guillaume.all...@gmail.com wrote: Hi I have been using Spark for quite some time using either scala or python. I wanted to give a try to groovy through scripts for small tests. Unfortunately I get the following exception (using that simple script https://gist.github.com/galleon/d6540327c418aa8a479f) Is there anything I am not doing correctly here. Thanks tog Groovy4Spark $ groovy GroovySparkWordcount.groovy class org.apache.spark.api.java.JavaRDD true true Caught: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.filter(RDD.scala:310) at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) at org.apache.spark.api.java.JavaRDD$filter$0.call(Unknown Source) at GroovySparkWordcount.run(GroovySparkWordcount.groovy:27) Caused by: java.io.NotSerializableException: GroovySparkWordcount Serialization stack: - object not serializable (class: GroovySparkWordcount, value: GroovySparkWordcount@7eee6c13) - field (class: GroovySparkWordcount$1, name: this$0, type: class GroovySparkWordcount) - object (class GroovySparkWordcount$1, GroovySparkWordcount$1@15c16f19) - field (class: org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 12 more -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
Re: unsubscribe
NO! On Tue, Jul 28, 2015 at 5:03 PM, Harshvardhan Chauhan ha...@gumgum.com wrote: -- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
unsubscribe
-- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Can you run the windows batch files (e.g. spark-submit.cmd) from the cygwin shell? On Tue, Jul 28, 2015 at 7:26 PM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Owen Add back the cygwin classpath detection can pass the issue mentioned before, but there seems lack of further support in the launch lib, see below stacktrace LAUNCH_CLASSPATH: C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar java -cp *C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --driver-class-path ../thirdparty/lib/db2-jdbc4-95fp6a/db2jcc4.jar --properties-file conf/spark.properties target/scala-2.10/price-scala-assembly-15.4.0-SNAPSHOT.jar Exception in thread main java.lang.IllegalStateException: Library directory '*C:\c\spark-1.4.0-bin-hadoop2.3\lib_managed\jars*' does not exist. at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:229) at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:215) at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:115) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:192) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:117) at org.apache.spark.launcher.Main.main(Main.java:74) Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 06:54 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 -- Does adding back the cygwin detection and this clause make it work? if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` fi If so I imagine that's fine to bring back, if that's still needed. On Tue, Jul 28, 2015 at 9:49 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: restart from last successful stage
Hi I do not think op asks about attempt failure but stage failure and finally leading to job failure. In that case, rdd info from last run is gone even if from cache, isn't it? Ayan On 29 Jul 2015 07:01, Tathagata Das t...@databricks.com wrote: If you are using the same RDDs in the both the attempts to run the job, the previous stage outputs generated in the previous job will indeed be reused. This applies to core though. For dataframes, depending on what you do, the physical plan may get generated again leading to new RDDs which may cause recomputing all the stages. Consider running the job by generating the RDD from Dataframe and then using that. Of course, you can use caching in both core and DataFrames, which will solve all these concerns. On Tue, Jul 28, 2015 at 1:03 PM, Alex Nastetsky alex.nastet...@vervemobile.com wrote: Is it possible to restart the job from the last successful stage instead of from the beginning? For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a long time and is successful, but the job fails on stage 1, it would be useful to be able to restart from the output of stage 0 instead of from the beginning. Note that I am NOT talking about Spark Streaming, just Spark Core (and DataFrames), not sure if the case would be different with Streaming. Thanks.
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
@Das, Is there anyway to identify a kafka topic when we have unified stream? As of now, for each topic I create dedicated DStream and use foreachRDD on each of these Streams. If I have say 100 kafka topics, then how can I use unified stream and still take topic specific actions inside foreachRDD ? On Tue, Jul 28, 2015 at 6:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design? -- Thanks Regards, Ashwin Giridharan
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Re: broadcast variable question
That's great! Thanks El martes, 28 de julio de 2015, Ted Yu yuzhih...@gmail.com escribió: If I understand correctly, there would be one value in the executor. Cheers On Tue, Jul 28, 2015 at 4:23 PM, Jonathan Coveney jcove...@gmail.com javascript:_e(%7B%7D,'cvml','jcove...@gmail.com'); wrote: i am running in coarse grained mode, let's say with 8 cores per executor. If I use a broadcast variable, will all of the tasks in that executor share the same value? Or will each task broadcast its own value ie in this case, would there be one value in the executor shared by the 8 tasks, or would there be eight copies, 1 per task?
Job hang when running random forest
Hi guys, A job hanged about 16 hours when I run random forest algorithm, I don't know why that happened. I use spark 1.4.0 on yarn and here is the code http://apache-spark-user-list.1001560.n3.nabble.com/file/n24047/1.png and following picture is from spark ui http://apache-spark-user-list.1001560.n3.nabble.com/file/n24047/2.png Can anybody help? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-hang-when-running-random-forest-tp24047.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Hi, Owen Add back the cygwin classpath detection can pass the issue mentioned before, but there seems lack of further support in the launch lib, see below stacktrace LAUNCH_CLASSPATH: C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar java -cp C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --driver-class-path ../thirdparty/lib/db2-jdbc4-95fp6a/db2jcc4.jar --properties-file conf/spark.properties target/scala-2.10/price-scala-assembly-15.4.0-SNAPSHOT.jar Exception in thread main java.lang.IllegalStateException: Library directory 'C:\c\spark-1.4.0-bin-hadoop2.3\lib_managed\jars' does not exist. at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:229) at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:215) at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:115) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:192) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:117) at org.apache.spark.launcher.Main.main(Main.java:74) Thanks Proust From: Sean Owen so...@cloudera.com To: Proust GZ Feng/China/IBM@IBMCN Cc: user user@spark.apache.org Date: 07/28/2015 06:54 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 Does adding back the cygwin detection and this clause make it work? if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` fi If so I imagine that's fine to bring back, if that's still needed. On Tue, Jul 28, 2015 at 9:49 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Thanks Vanzin, spark-submit.cmd works Thanks Proust From: Marcelo Vanzin van...@cloudera.com To: Proust GZ Feng/China/IBM@IBMCN Cc: Sean Owen so...@cloudera.com, user user@spark.apache.org Date: 07/29/2015 10:35 AM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 Can you run the windows batch files (e.g. spark-submit.cmd) from the cygwin shell? On Tue, Jul 28, 2015 at 7:26 PM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Owen Add back the cygwin classpath detection can pass the issue mentioned before, but there seems lack of further support in the launch lib, see below stacktrace LAUNCH_CLASSPATH: C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar java -cp C:\spark-1.4.0-bin-hadoop2.3\lib\spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --driver-class-path ../thirdparty/lib/db2-jdbc4-95fp6a/db2jcc4.jar --properties-file conf/spark.properties target/scala-2.10/price-scala-assembly-15.4.0-SNAPSHOT.jar Exception in thread main java.lang.IllegalStateException: Library directory 'C:\c\spark-1.4.0-bin-hadoop2.3\lib_managed\jars' does not exist. at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:229) at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:215) at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:115) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:192) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:117) at org.apache.spark.launcher.Main.main(Main.java:74) Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 06:54 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 Does adding back the cygwin detection and this clause make it work? if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` fi If so I imagine that's fine to bring back, if that's still needed. On Tue, Jul 28, 2015 at 9:49 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
@Ashwin: You could append the topic in the data. val kafkaStreams = topics.map { topic = KafkaUtils.createDirectStream(topic...).map { x = (x, topic) } } val unionedStream = context.union(kafkaStreams) @Brandon: I dont recommend it, but you could do something crazy like use the foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD waits for all the jobs to complete. manyDStreams.foreach { dstream = dstream1.foreachRDD { rdd = // Add runnable that runs the job on RDD to threadpool // This does not wait for the job to finish } } anyOfTheManyDStreams.foreachRDD { _ = // wait for all the current batch's jobs in the threadpool to complete. } This would run all the Spark jobs in the batch in parallel in thread pool, but it would also make sure all the jobs finish before the batch is marked as completed. On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com wrote: Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Job hang when running random forest
Hi guys When I run random forest algorithm, a job hanged for 15.8h, I can not figure out why that happened. Here is the code http://apache-spark-user-list.1001560.n3.nabble.com/file/n24046/%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7_2015-07-29_%E4%B8%8A%E5%8D%8810.png And I use spark 1.4.0 on yarn http://apache-spark-user-list.1001560.n3.nabble.com/file/n24046/%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7_2015-07-29_%E4%B8%8A%E5%8D%8810.png Can anyone help? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-hang-when-running-random-forest-tp24046.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Re: Spark Streaming Json file groupby function
Hi TD, We have a requirement to maintain the user session state and to maintain/update the metrics for minute, day and hour granularities for a user session in our Streaming job. Can I keep those granularities in the state and recalculate each time there is a change? How would the performance be impacted? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
broadcast variable question
i am running in coarse grained mode, let's say with 8 cores per executor. If I use a broadcast variable, will all of the tasks in that executor share the same value? Or will each task broadcast its own value ie in this case, would there be one value in the executor shared by the 8 tasks, or would there be eight copies, 1 per task?
Re: broadcast variable question
If I understand correctly, there would be one value in the executor. Cheers On Tue, Jul 28, 2015 at 4:23 PM, Jonathan Coveney jcove...@gmail.com wrote: i am running in coarse grained mode, let's say with 8 cores per executor. If I use a broadcast variable, will all of the tasks in that executor share the same value? Or will each task broadcast its own value ie in this case, would there be one value in the executor shared by the 8 tasks, or would there be eight copies, 1 per task?
Re: Spark Streaming Json file groupby function
If you are trying to keep such long term state, it will be more robust in the long term to use a dedicated data store (cassandra/HBase/etc.) that is designed for long term storage. On Tue, Jul 28, 2015 at 4:37 PM, swetha swethakasire...@gmail.com wrote: Hi TD, We have a requirement to maintain the user session state and to maintain/update the metrics for minute, day and hour granularities for a user session in our Streaming job. Can I keep those granularities in the state and recalculate each time there is a change? How would the performance be impacted? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p24041.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting java.net.BindException when attempting to start Spark master on EC2 node with public IP
Can you show the full stack trace ? Which Spark release are you using ? Thanks On Jul 27, 2015, at 10:07 AM, Wayne Song wayne.e.s...@gmail.com wrote: Hello, I am trying to start a Spark master for a standalone cluster on an EC2 node. The CLI command I'm using looks like this: Note that I'm specifying the --host argument; I want my Spark master to be listening on a specific IP address. The host that I'm specifying (i.e. 54.xx.xx.xx) is the public IP for my EC2 node; I've confirmed that nothing else is listening on port 7077 and that my EC2 security group has all ports open. I've also double-checked that the public IP is correct. When I use --host 54.xx.xx.xx, I get the following error message: This does not occur if I leave out the --host argument and it doesn't occur if I use --host 10.0.xx.xx, where 10.0.xx.xx is my private EC2 IP address. Why would Spark fail to bind to a public EC2 address? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-java-net-BindException-when-attempting-to-start-Spark-master-on-EC2-node-with-public-IP-tp24011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple operations on same DStream in Spark Streaming
Is this average supposed to be across all partitions? If so, it will require some one of the reduce operations in every batch interval. If that's too slow for the data rate, I would investigate using PairDStreamFunctions.updateStateByKey to compute the sum + count of the 2nd integers, per 1st integer, then do the filtering and final averaging downstream if you can, i.e., where you actually need the final value. If you need it on every batch iteration, then you'll have to do a reduce per iteration. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 28, 2015 at 3:14 AM, Akhil Das ak...@sigmoidanalytics.com wrote: One approach would be to store the batch data in an intermediate storage (like HBase/MySQL or even in zookeeper), and inside your filter function you just go and read the previous value from this storage and do whatever operation that you are supposed to do. Thanks Best Regards On Sun, Jul 26, 2015 at 3:37 AM, foobar heath...@fb.com wrote: Hi I'm working with Spark Streaming using scala, and trying to figure out the following problem. In my DStream[(int, int)], each record is an int pair tuple. For each batch, I would like to filter out all records with first integer below average of first integer in this batch, and for all records with first integer above average of first integer in the batch, compute the average of second integers in such records. What's the best practice to implement this? I tried this but kept getting the object not serializable exception because it's hard to share variables (such as average of first int in the batch) between workers and driver. Any suggestions? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-operations-on-same-DStream-in-Spark-Streaming-tp23995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is spark suitable for real time query
You can try out a few tricks employed by folks at Lynx Analytics... Daniel Darabos gave some details at Spark Summit: https://www.youtube.com/watch?v=zt1LdVj76LUindex=13list=PL-x35fyliRwhP52fwDqULJLOnqnrN5nDs On 22.7.2015. 17:00, Louis Hust wrote: My code like below: MapString, String t11opt = new HashMapString, String(); t11opt.put(url, DB_URL); t11opt.put(dbtable, t11); DataFrame t11 = sqlContext.load(jdbc, t11opt); t11.registerTempTable(t11); ...the same for t12, t21, t22 DataFrame t1 = t11.unionAll(t12); t1.registerTempTable(t1); DataFrame t2 = t21.unionAll(t22); t2.registerTempTable(t2); for (int i = 0; i 10; i ++) { System.out.println(new Date(System.currentTimeMillis())); DataFrame crossjoin = sqlContext.sql(select txt from t1 join t2 on t1.id http://t1.id = t2.id http://t2.id); crossjoin.show(); System.out.println(new Date(System.currentTimeMillis())); } Where t11,t12, t21,t22 are all table dataframe load from jdbc of mysql database which is at local with the spark job. But each loop execute about 3 seconds. i do not know why cost so many time? 2015-07-22 19:52 GMT+08:00 Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk: Here’s an example using spark-shell on my laptop: sc.textFile(LICENSE).filter(_ contains Spark).count This takes less than a second the first time I run it and is instantaneous on every subsequent run. What code are you running? On 22 Jul 2015, at 12:34, Louis Hust louis.h...@gmail.com mailto:louis.h...@gmail.com wrote: I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com mailto:louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Re: Spark - Eclipse IDE - Maven
Sorry about self-promotion, but there's a really nice tutorial for setting up Eclipse for Spark in Spark in Action book: http://www.manning.com/bonaci/ On 27.7.2015. 10:22, Akhil Das wrote: You can follow this doc https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IDESetup Thanks Best Regards On Fri, Jul 24, 2015 at 10:56 AM, Siva Reddy ksiv...@gmail.com mailto:ksiv...@gmail.com wrote: Hi All, I am trying to setup the Eclipse (LUNA) with Maven so that I create a maven projects for developing spark programs. I am having some issues and I am not sure what is the issue. Can Anyone share a nice step-step document to configure eclipse with maven for spark development. Thanks Siva -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Eclipse-IDE-Maven-tp23977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Multiple operations on same DStream in Spark Streaming
One approach would be to store the batch data in an intermediate storage (like HBase/MySQL or even in zookeeper), and inside your filter function you just go and read the previous value from this storage and do whatever operation that you are supposed to do. Thanks Best Regards On Sun, Jul 26, 2015 at 3:37 AM, foobar heath...@fb.com wrote: Hi I'm working with Spark Streaming using scala, and trying to figure out the following problem. In my DStream[(int, int)], each record is an int pair tuple. For each batch, I would like to filter out all records with first integer below average of first integer in this batch, and for all records with first integer above average of first integer in the batch, compute the average of second integers in such records. What's the best practice to implement this? I tried this but kept getting the object not serializable exception because it's hard to share variables (such as average of first int in the batch) between workers and driver. Any suggestions? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-operations-on-same-DStream-in-Spark-Streaming-tp23995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Checkpoints in SparkStreaming
I'm using SparkStreaming and I want to configure checkpoint to manage fault-tolerance. I've been reading the documentation. Is it necessary to create and configure the InputDSStream in the getOrCreate function? I checked the example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala and it looks like it does everything inside of the function. Should I put all the logic of the application inside on it?? I think that that's not the way... If I just create the context I got an error: Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) I'm not pretty good with Scala.. the code that I did def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf().setMaster(local[2]).setAppName(app) val ssc = new StreamingContext(sparkConf, Seconds(5)) // new context ssc.checkpoint(/tmp/spark/metricsCheckpoint) // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate(/tmp/spark/metricsCheckpoint, functionToCreateContext _) val kafkaParams = Map[String, String](metadata.broker.list - args(0)) val topics = args(1).split(\\,) val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) directKafkaStream.foreachRDD { rdd = ...
Re: Question abt serialization
Hi Akhil I have it working now with Groovy REPL in a form similar to the one you are mentionning. Still I dont understand why the previous form (with the Function) is raising that exception. Cheers Guillaume On 28 July 2015 at 08:56, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try it with just: (comment out line 27) println Count of spark: + file.filter({s - s.contains('spark')}). count() Thanks Best Regards On Sun, Jul 26, 2015 at 12:43 AM, tog guillaume.all...@gmail.com wrote: Hi I have been using Spark for quite some time using either scala or python. I wanted to give a try to groovy through scripts for small tests. Unfortunately I get the following exception (using that simple script https://gist.github.com/galleon/d6540327c418aa8a479f) Is there anything I am not doing correctly here. Thanks tog Groovy4Spark $ groovy GroovySparkWordcount.groovy class org.apache.spark.api.java.JavaRDD true true Caught: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.filter(RDD.scala:310) at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) at org.apache.spark.api.java.JavaRDD$filter$0.call(Unknown Source) at GroovySparkWordcount.run(GroovySparkWordcount.groovy:27) Caused by: java.io.NotSerializableException: GroovySparkWordcount Serialization stack: - object not serializable (class: GroovySparkWordcount, value: GroovySparkWordcount@7eee6c13) - field (class: GroovySparkWordcount$1, name: this$0, type: class GroovySparkWordcount) - object (class GroovySparkWordcount$1, GroovySparkWordcount$1@15c16f19) - field (class: org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 12 more -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
Re: Getting java.net.BindException when attempting to start Spark master on EC2 node with public IP
Did you try binding to 0.0.0.0? Thanks Best Regards On Mon, Jul 27, 2015 at 10:37 PM, Wayne Song wayne.e.s...@gmail.com wrote: Hello, I am trying to start a Spark master for a standalone cluster on an EC2 node. The CLI command I'm using looks like this: Note that I'm specifying the --host argument; I want my Spark master to be listening on a specific IP address. The host that I'm specifying (i.e. 54.xx.xx.xx) is the public IP for my EC2 node; I've confirmed that nothing else is listening on port 7077 and that my EC2 security group has all ports open. I've also double-checked that the public IP is correct. When I use --host 54.xx.xx.xx, I get the following error message: This does not occur if I leave out the --host argument and it doesn't occur if I use --host 10.0.xx.xx, where 10.0.xx.xx is my private EC2 IP address. Why would Spark fail to bind to a public EC2 address? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-java-net-BindException-when-attempting-to-start-Spark-master-on-EC2-node-with-public-IP-tp24011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
You need to trigger an action on your rowrdd for it to execute the map, you can do a rowrdd.count() for that. Thanks Best Regards On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy manohar.re...@happiestminds.com wrote: Hi Akhil, Thanks for thereply.I found the root cause but don’t know how to solve this. Below is the cause.this map function not going inside to execute because of this all my list fields are empty. Please let me know what might be the cause to not execute this snippet of code*.the below map is not execution not going inside.* JavaRDDRow rowrdd=*rdd**.map(**new** FunctionMessageAndMetadata, Row() {* *@Override* *public** Row call(MessageAndMetadata **arg0**) **throws** Exception {* * System.**out**.println(**inside thread map ca**);* *String[] **data**=**new** String(**arg0* *.getPayload()).split(**\\|**);* *int* *i**=0;* *for** (String* string : data) { *if*(i3){ *if*(i%2==0){ fields.add(DataTypes. *createStructField*(string, DataTypes.*StringType*, *true*)); System.*out*.println(string); }*else*{ listvalues.add(string); System.*out*.println(string); } *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, July 28, 2015 1:52 PM *To:* Manohar Reddy *Cc:* user@spark.apache.org *Subject:* Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client Put a try catch inside your code and inside the catch print out the length or the list itself which causes the ArrayIndexOutOfBounds. It might happen that some of your data is not proper. Thanks Best Regards On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 manohar.re...@happiestminds.com wrote: Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4. Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet. JavaDStreamMessageAndMetadata unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); unionStreams .foreachRDD(new Function2JavaRDDlt;MessageAndMetadata, Time, Void() { @Override public Void call(JavaRDDMessageAndMetadata rdd, Time time) throws Exception { new ThreadParam(rdd).start(); return null; } }); # public ThreadParam(JavaRDDMessageAndMetadata rdd) { this.rdd = rdd; // this.context=context; } public void run(){ final ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); final ListString meta=new ArrayList(); JavaRDDRow rowrdd=rdd.map(new FunctionMessageAndMetadata, Row() { @Override public Row call(MessageAndMetadata arg0) throws Exception { String[] data=new String(arg0.getPayload()).split(\\|); int i=0; ListStructField fields = new ArrayListStructField(); ListString listvalues=new ArrayList(); ListString meta=new ArrayList(); for (String string : data) { if(i3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); // System.out.println(splitarr[i]); }else{ listvalues.add(string); // System.out.println(splitarr[i]); } }else{ meta.add(string); } i++; }int size=listvalues.size(); return RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23), listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20), listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17), listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14),
Re: use S3-Compatible Storage with spark
I tried those 3 possibilities, and everything is working = endpoint param is not working : sc.hadoopConfiguration.set(s3service.s3-endpoint,test) sc.hadoopConfiguration.set(fs.s3n.endpoint,test) sc.hadoopConfiguration.set(fs.s3n.s3-endpoint,test) 2015-07-28 10:28 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: With s3n try this out: *s3service.s3-endpoint*The host name of the S3 service. You should only ever change this value from the default if you need to contact an alternative S3 endpoint for testing purposes. Default: s3.amazonaws.com Thanks Best Regards On Tue, Jul 28, 2015 at 1:54 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi recompiled and retried, now its looking like this with s3a : com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain S3n is working find, (only problem is still the endpoint)
Re: spark streaming get kafka individual message's offset and partition no
You don't have to use some other package in order to get access to the offsets. Shushant, have you read the available documentation at http://spark.apache.org/docs/latest/streaming-kafka-integration.html https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md or watched https://www.youtube.com/watch?v=fXnNEq1v3VA The kafka partition number is the same as the spark partition number. The messages for a given partition are in offset order without gaps, so you can use the offset range to determine the offset for a given message. Or you can use the messageHandler argument to KafkaUtils.createDirectStream to get access to all of the MessageAndMetadata, including partition and offset, on a per-message basis. On Tue, Jul 28, 2015 at 7:48 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am processing kafka messages using spark streaming 1.3. I am using mapPartitions function to process kafka message. How can I access offset no of individual message getting being processed. JavaPairInputDStreambyte[], byte[] directKafkaStream =KafkaUtils.createDirectStream(..); directKafkaStream.mapPartitions(new FlatMapFunctionIteratorTuple2byte[],byte[], String() { public IterableString call(IteratorTuple2byte[], byte[] t) throws Exception { while(t.hasNext()){ Tuple2byte[], byte[] tuple = t.next(); byte[] key = tuple._1(); byte[] msg = tuple._2(); ///how to get kafka partition no and offset of this message } } });
Re: Which directory contains third party libraries for Spark
Can you show us the snippet of the exception stack ? Thanks On Jul 27, 2015, at 10:22 PM, Stephen Boesch java...@gmail.com wrote: when using spark-submit: which directory contains third party libraries that will be loaded on each of the slaves? I would like to scp one or more libraries to each of the slaves instead of shipping the contents in the application uber-jar. Note: I did try adding to $SPARK_HOME/lib_managed/jars. But the spark-submit still results in a ClassNotFoundException for classes included in the added library. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: log file directory
Path to log file should be displayed when you launch the master. e.g. /mnt/var/log/apps/spark -hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out On Mon, Jul 27, 2015 at 11:28 PM, Jack Yang j...@uow.edu.au wrote: Hi all, I have questions with regarding to the log file directory. That say if I run “spark-submit --master local[4]”, where is the log file? Then how about if I run standalone “spark-submit --master spark://mymaster:7077”? Best regards, Jack
Re: Checkpoints in SparkStreaming
Yes, you need to follow the documentation. Configure your stream, including the transformations made to it, inside the getOrCreate function. On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm using SparkStreaming and I want to configure checkpoint to manage fault-tolerance. I've been reading the documentation. Is it necessary to create and configure the InputDSStream in the getOrCreate function? I checked the example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala and it looks like it does everything inside of the function. Should I put all the logic of the application inside on it?? I think that that's not the way... If I just create the context I got an error: Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) I'm not pretty good with Scala.. the code that I did def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf().setMaster(local[2]).setAppName(app) val ssc = new StreamingContext(sparkConf, Seconds(5)) // new context ssc.checkpoint(/tmp/spark/metricsCheckpoint) // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate(/tmp/spark/metricsCheckpoint, functionToCreateContext _) val kafkaParams = Map[String, String](metadata.broker.list - args(0)) val topics = args(1).split(\\,) val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) directKafkaStream.foreachRDD { rdd = ...
Re: Clustetr setup for SPARK standalone application:
When you say you installed Spark, did you install the master and slave services for standalone mode as described here http://spark.apache.org/docs/latest/spark-standalone.html? If you intended to run Spark on Hadoop, see here http://spark.apache.org/docs/latest/running-on-yarn.html. It looks like either the master service isn't running or isn't reachable over your network. Is hadoopm0 publicly routable? Is port 7077 blocked? As a test, can you telnet to it? telnet hadoopm0 7077 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 28, 2015 at 7:01 AM, Sagar sagarjad...@yonsei.ac.kr wrote: Hello Sir, I am MS student working on SPARK. I am totally new in this field. I have install the spark. The local spark-shell is working fine. But whenever I tried the Master configuration I got some errors. When I run this command ; MASTER=spark://hadoopm0:7077 spark-shell I gets the errors likes; 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master spark://hadoopm0:7077... 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. Also I have attached the my screenshot of Master UI. Can you please give me some references, documentations or how to solve this issue. Thanks in advance. Thanking You, -- [image: Avast logo] http://www.avast.com/ This email has been checked for viruses by Avast antivirus software. www.avast.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Checkpoint issue in spark streaming
Hi all. I am writing a twitter connector using spark streaming. i have written the following code to maintain checkpoint. val ssc=StreamingContext.getOrCreate(hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpoint,()= { managingContext() }) def managingContext():StreamingContext = { //making spark context val conf = new SparkConf().setMaster(local[*]).setAppName(twitterConnector) val ssc = new StreamingContext(conf, Seconds(1)) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) import sqlContext.implicits._ //checkpointing /ssc.checkpoint(hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpoint) ssc } but it gives the following error: java.lang.IllegalArgumentException: requirement failed: WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting. Please use DStream.persist() to set the storage level to use memory for better checkpointing performance. I have also mentioned the storage level through the following code. TwitterUtils.createStream(ssc,None,null,StorageLevel.MEMORY_AND_DISK_2) Can anyone help me in this regard? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Checkpoint-issue-in-spark-streaming-tp24031.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
That stacktrace looks like an out of heap space on the driver while writing checkpoint, not on the worker nodes. How much memory are you giving the driver? How big are your stored checkpoints? On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi, After using KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder, Option[AnalyticEventEnriched]](ssc, kafkaParams, map, messageHandler), I'm encountering the following issue: 15/07/28 00:29:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-24] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space at java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2351) at java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:137) at scala.collection.mutable.HashMap$$anonfun$writeObject$1.apply(HashMap.scala:135) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashTable$class.serializeTo(HashTable.scala:124) at scala.collection.mutable.HashMap.serializeTo(HashMap.scala:39) at scala.collection.mutable.HashMap.writeObject(HashMap.scala:135) at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:128) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) I don't know why, after that, it's eating all the CPU on one of the node till the entire job stopped. It tries to resume from checkpoint several times but failed with this error too. I think I have enough spared memory with 4 nodes with 24 Gb per nodes. It has processed successfully around 40 gb before that and looking into storage in Spark UI, I don't have a big rdd stored in memory/disk. I notice on this node, there's an increase in connection to kafka that are not closed too. Regards, Nicolas P. On Fri, Jul 24, 2015 at 3:32 PM, Cody Koeninger c...@koeninger.org wrote: It's really a question of whether you need access to the MessageAndMetadata, or just the key / value from the message. If you just need the key/value, dstream map is fine. In your case, since you need to be able to control a possible failure when deserializing the message from the MessageAndMetadata, I'd just go ahead and do the work in the messageHandler. On Fri, Jul 24, 2015 at 2:46 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, I manage to read all my data back with skipping offset that contains a corrupt message. I have one more question regarding messageHandler method vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm using a function to read the serialized message from kafka and convert it into my appropriate object with some enrichments and sometimes add filter after that. Where's the best spot to put this logic inside messageHandler method (convert each message within this handler) or dstream.foreachRDD.map (map rdd) or dstream.map.foreachRDD (map dstream) ?
Fwd: Writing streaming data to cassandra creates duplicates
Hi TD, Thanks for the info. I have the scenario like this. I am reading the data from kafka topic. Let's say kafka has 3 partitions for the topic. In my streaming application, I would configure 3 receivers with 1 thread each such that they would receive 3 dstreams (from 3 partitions of kafka topic) and also I implement partitioner. Now there is a possibility of receiving messages with same primary key twice or more, one is at the time message is created and other times if there is an update to any fields for same message. If two messages M1 and M2 with same primary key are read by 2 receivers then even the partitioner in spark would still end up in parallel processing as there are altogether in different dstreams. How do we address in this situation ? Thanks, Padma Ch On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com wrote: You have to partition that data on the Spark Streaming by the primary key, and then make sure insert data into Cassandra atomically per key, or per set of keys in the partition. You can use the combination of the (batch time, and partition Id) of the RDD inside foreachRDD as the unique id for the data you are inserting. This will guard against multiple attempts to run the task that inserts into Cassandra. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations TD On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I have a problem when writing streaming data to cassandra. Or existing product is on Oracle DB in which while wrtiting data, locks are maintained such that duplicates in the DB are avoided. But as spark has parallel processing architecture, if more than 1 thread is trying to write same data i.e with same primary key, is there as any scope to created duplicates? If yes, how to address this problem either from spark or from cassandra side ? Thanks, Padma Ch
Re: Spark - Eclipse IDE - Maven
Sorry about self-promotion, but there's a really nice tutorial for setting up Eclipse for Spark in Spark in Action book: http://www.manning.com/bonaci/ On 24.7.2015. 7:26, Siva Reddy wrote: Hi All, I am trying to setup the Eclipse (LUNA) with Maven so that I create a maven projects for developing spark programs. I am having some issues and I am not sure what is the issue. Can Anyone share a nice step-step document to configure eclipse with maven for spark development. Thanks Siva -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Eclipse-IDE-Maven-tp23977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Generalised Spark-HBase integration
Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Is spark suitable for real time query
You can try out a few tricks employed by folks at Lynx Analytics... Daniel Darabos gave some details at Spark Summit: https://www.youtube.com/watch?v=zt1LdVj76LUindex=13list=PL-x35fyliRwhP52fwDqULJLOnqnrN5nDs On 22.7.2015. 17:00, Louis Hust wrote: My code like below: MapString, String t11opt = new HashMapString, String(); t11opt.put(url, DB_URL); t11opt.put(dbtable, t11); DataFrame t11 = sqlContext.load(jdbc, t11opt); t11.registerTempTable(t11); ...the same for t12, t21, t22 DataFrame t1 = t11.unionAll(t12); t1.registerTempTable(t1); DataFrame t2 = t21.unionAll(t22); t2.registerTempTable(t2); for (int i = 0; i 10; i ++) { System.out.println(new Date(System.currentTimeMillis())); DataFrame crossjoin = sqlContext.sql(select txt from t1 join t2 on t1.id http://t1.id = t2.id http://t2.id); crossjoin.show(); System.out.println(new Date(System.currentTimeMillis())); } Where t11,t12, t21,t22 are all table dataframe load from jdbc of mysql database which is at local with the spark job. But each loop execute about 3 seconds. i do not know why cost so many time? 2015-07-22 19:52 GMT+08:00 Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk: Here’s an example using spark-shell on my laptop: sc.textFile(LICENSE).filter(_ contains Spark).count This takes less than a second the first time I run it and is instantaneous on every subsequent run. What code are you running? On 22 Jul 2015, at 12:34, Louis Hust louis.h...@gmail.com mailto:louis.h...@gmail.com wrote: I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com mailto:louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Re: Generalised Spark-HBase integration
Oops, yes, I'm still messing with the repo on a daily basis.. fixed On 28 July 2015 at 17:11, Ted Yu yuzhih...@gmail.com wrote: I got a compilation error: [INFO] /home/hbase/s-on-hbase/src/main/scala:-1: info: compiling [INFO] Compiling 18 source files to /home/hbase/s-on-hbase/target/classes at 1438099569598 [ERROR] /home/hbase/s-on-hbase/src/main/scala/org/apache/spark/hbase/examples/simple/HBaseTableSimple.scala:36: error: type mismatch; [INFO] found : Int [INFO] required: Short [INFO] while (scanner.advance) numCells += 1 [INFO]^ [ERROR] one error found FYI On Tue, Jul 28, 2015 at 8:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Generalised Spark-HBase integration
Cool, will revisit, is your latest code visible publicly somewhere ? On 28 July 2015 at 17:14, Ted Malaska ted.mala...@cloudera.com wrote: Yup you should be able to do that with the APIs that are going into HBase. Let me know if you need to chat about the problem and how to implement it with the HBase apis. We have tried to cover any possible way to use HBase with Spark. Let us know if we missed anything if we did we will add it. On Tue, Jul 28, 2015 at 12:12 PM, Michal Haris michal.ha...@visualdna.com wrote: Hi Ted, yes, cloudera blog and your code was my starting point - but I needed something more spark-centric rather than on hbase. Basically doing a lot of ad-hoc transformations with RDDs that were based on HBase tables and then mutating them after series of iterative (bsp-like) steps. On 28 July 2015 at 17:06, Ted Malaska ted.mala...@cloudera.com wrote: Thanks Michal, Just to share what I'm working on in a related topic. So a long time ago I build SparkOnHBase and put it into Cloudera Labs in this link. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Also recently I am working on getting this into HBase core. It will hopefully be in HBase core with in the next couple of weeks. https://issues.apache.org/jira/browse/HBASE-13992 Then I'm planing on adding dataframe and bulk load support through https://issues.apache.org/jira/browse/HBASE-14149 https://issues.apache.org/jira/browse/HBASE-14150 Also if you are interested this is running today a at least a half a dozen companies with Spark Streaming. Here is one blog post of successful implementation http://blog.cloudera.com/blog/2015/03/how-edmunds-com-used-spark-streaming-to-build-a-near-real-time-dashboard/ Also here is an additional example blog I also put together http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ Let me know if you have any questions, also let me know if you want to connect to join efforts. Ted Malaska On Tue, Jul 28, 2015 at 11:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
projection optimization?
If I have a Hive table with six columns and create a DataFrame (Spark 1.4.1) using a sqlContext.sql(select * from ...) query, the resulting physical plan shown by explain reflects the goal of returning all six columns. If I then call select(one_column) on that first DataFrame, the resulting DataFrame still gives a physical plan of fetching all six columns. Shouldn't the subsequent select() have pruned the projections in the physical plan?
PySpark MLlib Numpy Dependency
The documentation for the Numpy dependency for MLlib seems somewhat vague [1]. Is Numpy only a dependency for the driver node, or must it also be installed on every worker node? Thanks, Alek [1] -- http://spark.apache.org/docs/latest/mllib-guide.html#dependencies CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
Generalised Spark-HBase integration
Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
sc.parallelise to work more like a producer/consumer?
Hi, I am using sc.parallelise(...32k of items) several times for 1 job. Each executor takes x amount of time to process it's items but this results in some executors finishing quickly and staying idle till the others catch up. Only after all executors complete the first 32k batch, the next batch is send for processing. Is there a way to make it work more as producer/consumer? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelise-to-work-more-like-a-producer-consumer-tp24032.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-csv number of partitions
Hello, I'm using spark-csv instead of sc.textfile() to work with CSV files. How can I set no.of partitions that will be created when reading a CSV? Basically an equivalent for minPartitions in textFile() val myrdd = sc.textFile(my.csv,24) Srikanth
Re: Generalised Spark-HBase integration
I got a compilation error: [INFO] /home/hbase/s-on-hbase/src/main/scala:-1: info: compiling [INFO] Compiling 18 source files to /home/hbase/s-on-hbase/target/classes at 1438099569598 [ERROR] /home/hbase/s-on-hbase/src/main/scala/org/apache/spark/hbase/examples/simple/HBaseTableSimple.scala:36: error: type mismatch; [INFO] found : Int [INFO] required: Short [INFO] while (scanner.advance) numCells += 1 [INFO]^ [ERROR] one error found FYI On Tue, Jul 28, 2015 at 8:59 AM, Michal Haris michal.ha...@visualdna.com wrote: Hi all, last couple of months I've been working on a large graph analytics and along the way have written from scratch a HBase-Spark integration as none of the ones out there worked either in terms of scale or in the way they integrated with the RDD interface. This week I have generalised it into an (almost) spark module, which works with the latest spark and the new hbase api, so... sharing! : https://github.com/michal-harish/spark-on-hbase -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Messages are not stored for actorStream when using RoundRobinRouter
Hi, I'm using a simple akka actor to create a actorStream. The actor just forwards the messages received to the stream by calling super[ActorHelper].store(msg). This works ok when I create the stream with ssc.actorStream[A](Props(new ProxyReceiverActor[A]), receiverActorName) but when I try to use a RoundRobinRouter in order to obtain more receiver parallelism then all the messages are lost. ssc.actorStream[A](Props(new ProxyReceiverActor[A]).withRouter(RoundRobinRouter(nrOfInstances=3)), receiverActorName) In the logs I can see that 3 actors are created, and that the messages are received, but the call to store() seems to do nothing. Anyone can help me with this? Thanks a lot in advance, Greetings, Juan
Iterating over values by Key
I have K/V pairs where V is an Iterable (from previous groupBy). I use the JAVA API. What I want is to iterate over the values by key, and on every element set previousElementId attribute, that is the id of the previous element in the sorted list. I try to do this with mapValues. I create an array from the Iterable, sort the list there and iterate over the values, setting the attribute and saving the reference for the next element. After that, return the array. Is this the best approach or I miss something? Spark version: 1.4.1 Java version: 1.8 Thanks in advance. Mate -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Iterating-over-values-by-Key-tp24029.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming get kafka individual message's offset and partition no
Hi I am processing kafka messages using spark streaming 1.3. I am using mapPartitions function to process kafka message. How can I access offset no of individual message getting being processed. JavaPairInputDStreambyte[], byte[] directKafkaStream =KafkaUtils.createDirectStream(..); directKafkaStream.mapPartitions(new FlatMapFunctionIteratorTuple2byte[],byte[], String() { public IterableString call(IteratorTuple2byte[], byte[] t) throws Exception { while(t.hasNext()){ Tuple2byte[], byte[] tuple = t.next(); byte[] key = tuple._1(); byte[] msg = tuple._2(); ///how to get kafka partition no and offset of this message } } });
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
Does adding back the cygwin detection and this clause make it work? if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` fi If so I imagine that's fine to bring back, if that's still needed. On Tue, Jul 28, 2015 at 9:49 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Thanks Owen, the problem under Cygwin is while run spark-submit under 1.4.0, it simply report Error: Could not find or load main class org.apache.spark.launcher.Main This is because under Cygwin spark-class make the LAUNCH_CLASSPATH as /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar But under Cygwin java in Windows cannot recognize the classpath, so below command simply error out java -cp /c/spark-1.4.0-bin-hadoop2.3/lib/spark-assembly-1.4.0-hadoop2.3.0.jar org.apache.spark.launcher.Main Error: Could not find or load main class org.apache.spark.launcher.Main Thanks Proust From:Sean Owen so...@cloudera.com To:Proust GZ Feng/China/IBM@IBMCN Cc:user user@spark.apache.org Date:07/28/2015 02:20 PM Subject:Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0 It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: *Metrics API is odd in MLLib
Hi Xiangrui Spark People, I recently got round to writing an evaluation framework for Spark that I was hoping to PR into MLLib and this would solve some of the aforementioned issues. I have put the code on github in a separate repo for now as I would like to get some sandboxed feedback. The repo complete with detailed documentation can be found here https://github.com/samthebest/sceval. Many thanks, Sam On Thu, Jun 18, 2015 at 11:00 AM, Sam samthesav...@gmail.com wrote: Firstly apologies for the header of my email containing some junk, I believe it's due to a copy and paste error on a smart phone. Thanks for your response. I will indeed make the PR you suggest, though glancing at the code I realize it's not just a case of making these public since the types are also private. Then, there is certain functionality I will be exposing, which then ought to be tested, e.g. every bin except potentially the last will have an equal number of data points in it*. I'll get round to it at some point. As for BinaryClassificationMetrics using Double for labels, thanks for the explanation. If I where to make a PR to encapsulate the underlying implementation (that uses LabeledPoint) and change the type to Boolean, would what be the impact to versioning (since I'd be changing public API)? An alternative would be to create a new wrapper class, say BinaryClassificationMeasures, and deprecate the old with the intention of migrating all the code into the new class. * Maybe some other part of the code base tests this, since this assumption must hold in order to average across folds in x-validation? On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote: LabeledPoint was used for both classification and regression, where label type is Double for simplicity. So in BinaryClassificationMetrics, we still use Double for labels. We compute the confusion matrix at each threshold internally, but this is not exposed to users ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127). Feel free to submit a PR to make it public. -Xiangrui On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote: Google+ https://plus.google.com/app/basic?nopromo=1source=moggl=uk http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk Calendar https://www.google.com/calendar/gpcal?source=moggl=uk Web http://www.google.co.uk/?source=moggl=uk more Inbox Apache Spark Email GmailNot Work S sam.sav...@barclays.com to me 0 minutes ago Details According to https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles, this seems odd, shouldn't it be Boolean? Similarly for MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for MulticlassMetrics the type of both should be generic? Additionally it would be good if either the ROC output type was changed or another method was added that returned confusion matricies, so that the hard integer values can be obtained before the divisions. E.g. ``` case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int) { // bunch of methods for each of the things in the table here https://en.wikipedia.org/wiki/Receiver_operating_characteristic } ... def confusions(): RDD[Confusion] ```
Authentication Support with spark-submit cluster mode
Hi,I'd like to remotely run spark-submit from a local machine to submit a job to spark cluster (cluster mode). What method do I use to authenticate myself to the cluster? Like how to pass user id or password or private key to the cluster Any help is appreciated.
SparkR does not include SparkContext
Hi, I'm starting R on Spark via the sparkR script but I can't access the sparkcontext as described in the programming guide. Any ideas? Thanks, Siegfried
Re: Is SPARK is the right choice for traditional OLAP query processing?
You may check out apache phoenix on top of Hbase for this. However, it does not have ODBC drivers, but JDBC ones. Maybe Hive 1.2 with a new version of TEZ will also serve your purpose. You should run some proof of concept with these technologies using real or generated data. About how much data are we talking about in the fact and dimensional tables. Are thé dimensional tables small or large? Does your current software setup not satisfy your requirements? Le mar. 21 juil. 2015 à 2:06, renga.kannan renga.kan...@gmail.com a écrit : All, I really appreciate anyone's input on this. We are having a very simple traditional OLAP query processing use case. Our use case is as follows. 1. We have a customer sales order table data coming from RDBMs table. 2. There are many dimension columns in the sales order table. For each of those dimensions, we have individual dimension tables that stores the dimension record sets. 3. We also have some BI like hierarchies that is defined for dimension data set. What we want for business users is as follows.? 1. We wanted to show some aggregated values from sales Order transaction table columns. 2. User would like to filter these with specific dimension values from dimension table. 3. User should be able to drill down from higher level to lower level by traversing hierarchy on dimension We want these use actions respond within 2 to 5 seconds. We are thinking about using SPARK as our backend enginee to sever data to these front end application. Has anyone tried using SPARK for these kind of use cases. These are all traditional use cases in BI space. If so, can SPARK respond to these queries with in 2 to 5 seconds for large data sets. Thanks, Renga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SPARK-is-the-right-choice-for-traditional-OLAP-query-processing-tp23921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does spark-submit support file transfering from local to cluster?
Hi, I'm using spark-submit cluster mode to submit a job from local to spark cluster. There are input files, output files, and job log files that I need to transfer in and out between local machine and spark cluster.Any recommendation methods to use file transferring. Is there any future plan that spark will support file transferring from cluster to local and vice versa. Any help is appreciated. Thanks.
Re: Is SPARK is the right choice for traditional OLAP query processing?
We want these use actions respond within 2 to 5 seconds. I think this goal is a stretch for Spark. Some queries may run faster than that on a large dataset, but in general you can't put an SLA like this. For example if you have to join some huge datasets, you'll likely will be much over that. Spark is great for huge jobs and it'll be much faster than MR. I don't think Spark was designed with interactive queries in mind. For example, although Spark is in-memory, its in-memory is only for a job. It's not like in traditional RDBMS systems where you have a persistent buffer cache or in-memory columnar storage (both are Oracle terms) If you have multiple users running interatactive BI queries, results that were cached for first user wouldn't be used by second user. Unless you invent something that would keep a persistent Spark context and serve users' requests and decided which RDDs to cache, when and how. At least that's my understanding how Spark works. If I'm wrong, I will be glad to hear that as we ran into the same questions. As we use Cloudera's CDH, I'm not sure where Hortonworks are with their Tez project, but Tez has components that resemble closer to buffer cache or in-memory columnar storage caching from traditional RDBMS systems, and may get better and/or more predictable performance on BI queries. -- Ruslan Dautkhanov On Mon, Jul 20, 2015 at 6:04 PM, renga.kannan renga.kan...@gmail.com wrote: All, I really appreciate anyone's input on this. We are having a very simple traditional OLAP query processing use case. Our use case is as follows. 1. We have a customer sales order table data coming from RDBMs table. 2. There are many dimension columns in the sales order table. For each of those dimensions, we have individual dimension tables that stores the dimension record sets. 3. We also have some BI like hierarchies that is defined for dimension data set. What we want for business users is as follows.? 1. We wanted to show some aggregated values from sales Order transaction table columns. 2. User would like to filter these with specific dimension values from dimension table. 3. User should be able to drill down from higher level to lower level by traversing hierarchy on dimension We want these use actions respond within 2 to 5 seconds. We are thinking about using SPARK as our backend enginee to sever data to these front end application. Has anyone tried using SPARK for these kind of use cases. These are all traditional use cases in BI space. If so, can SPARK respond to these queries with in 2 to 5 seconds for large data sets. Thanks, Renga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SPARK-is-the-right-choice-for-traditional-OLAP-query-processing-tp23921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data from PostgreSQL to Spark
Sqoop’s incremental data fetch will reduce the data size you need to pull from source, but then by the time that incremental data fetch is complete, is it not current again, if velocity of the data is high? May be you can put a trigger in Postgres to send data to the big data cluster as soon as changes are made. Or as I was saying in another email, can the source write to Kafka/Flume/Hbase in addition to Postgres? Sent from Windows Mail From: Jeetendra Gangele Sent: Tuesday, July 28, 2015 5:43 AM To: santosh...@gmail.com Cc: ayan guha, felixcheun...@hotmail.com, user@spark.apache.org I trying do that, but there will always data mismatch, since by the time scoop is fetching main database will get so many updates. There is something called incremental data fetch using scoop but that hits a database rather than reading the WAL edit. On 28 July 2015 at 02:52, santosh...@gmail.com wrote: Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail From: ayan guha Sent: Monday, July 27, 2015 4:41 PM To: Jeetendra Gangele Cc: felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: hive.contrib.serde2.RegexSerDe not found
Try use: org.apache.hadoop.hive.serde2.RegexSerDe GP On 27 Jul 2015, at 09:35, ZhuGe t...@outlook.commailto:t...@outlook.com wrote: Hi all: I am testing the performance of hive on spark sql. The existing table is created with ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'input.regex' = '(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)' ,'output.format.string' = '%1$s %2$s %3$s %4$s %5$s %16$s %7$s %8$s %9$s %10$s %11$s %12$s %13$s %14$s %15$s %16$s %17$s ') STORED AS TEXTFILE location '/data/BaseData/wx/xx/xx/xx/xx'; When i use spark sql(spark-shell) to query the existing table, got exception like this: Caused by: MetaException(message:java.lang.ClassNotFoundException Class org.apache.hadoop.hive.contrib.serde2.RegexSerDe not found) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:382) at org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249) I add the jar dependency in the spark-shell command, still do not work. SPARK_SUBMIT_OPTS=-XX:MaxPermSize=256m ./bin/spark-shell --jars /data/dbcenter/cdh5/spark-1.4.0-bin-hadoop2.4/hive-contrib-0.13.1-cdh5.2.0.jar,postgresql-9.2-1004-jdbc41.jar How should i fix the problem? Cheers
Re: Which directory contains third party libraries for Spark
Hey Stephen, In case these libraries exist on the client as a form of maven library, you can use --packages to ship the library and all it's dependencies, without building an uber jar. Best, Burak On Tue, Jul 28, 2015 at 10:23 AM, Marcelo Vanzin van...@cloudera.com wrote: Hi Stephen, There is no such directory currently. If you want to add an existing jar to every app's classpath, you need to modify two config values: spark.driver.extraClassPath and spark.executor.extraClassPath. On Mon, Jul 27, 2015 at 10:22 PM, Stephen Boesch java...@gmail.com wrote: when using spark-submit: which directory contains third party libraries that will be loaded on each of the slaves? I would like to scp one or more libraries to each of the slaves instead of shipping the contents in the application uber-jar. Note: I did try adding to $SPARK_HOME/lib_managed/jars. But the spark-submit still results in a ClassNotFoundException for classes included in the added library. -- Marcelo
Re: Data from PostgreSQL to Spark
can the source write to Kafka/Flume/Hbase in addition to Postgres? no it can't write ,this is due to the fact that there are many applications those are producing this postGreSql data.I can't really asked all the teams to start writing to some other source. velocity of the application is too high. On 28 July 2015 at 21:50, santosh...@gmail.com wrote: Sqoop’s incremental data fetch will reduce the data size you need to pull from source, but then by the time that incremental data fetch is complete, is it not current again, if velocity of the data is high? May be you can put a trigger in Postgres to send data to the big data cluster as soon as changes are made. Or as I was saying in another email, can the source write to Kafka/Flume/Hbase in addition to Postgres? Sent from Windows Mail *From:* Jeetendra Gangele gangele...@gmail.com *Sent:* Tuesday, July 28, 2015 5:43 AM *To:* santosh...@gmail.com *Cc:* ayan guha guha.a...@gmail.com, felixcheun...@hotmail.com, user@spark.apache.org I trying do that, but there will always data mismatch, since by the time scoop is fetching main database will get so many updates. There is something called incremental data fetch using scoop but that hits a database rather than reading the WAL edit. On 28 July 2015 at 02:52, santosh...@gmail.com wrote: Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail *From:* ayan guha guha.a...@gmail.com *Sent:* Monday, July 27, 2015 4:41 PM *To:* Jeetendra Gangele gangele...@gmail.com *Cc:* felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: Which directory contains third party libraries for Spark
Hi Stephen, There is no such directory currently. If you want to add an existing jar to every app's classpath, you need to modify two config values: spark.driver.extraClassPath and spark.executor.extraClassPath. On Mon, Jul 27, 2015 at 10:22 PM, Stephen Boesch java...@gmail.com wrote: when using spark-submit: which directory contains third party libraries that will be loaded on each of the slaves? I would like to scp one or more libraries to each of the slaves instead of shipping the contents in the application uber-jar. Note: I did try adding to $SPARK_HOME/lib_managed/jars. But the spark-submit still results in a ClassNotFoundException for classes included in the added library. -- Marcelo
Re: Data from PostgreSQL to Spark
Can you put some transparent cache in front of the database? Or some jdbc proxy? Le mar. 28 juil. 2015 à 19:34, Jeetendra Gangele gangele...@gmail.com a écrit : can the source write to Kafka/Flume/Hbase in addition to Postgres? no it can't write ,this is due to the fact that there are many applications those are producing this postGreSql data.I can't really asked all the teams to start writing to some other source. velocity of the application is too high. On 28 July 2015 at 21:50, santosh...@gmail.com wrote: Sqoop’s incremental data fetch will reduce the data size you need to pull from source, but then by the time that incremental data fetch is complete, is it not current again, if velocity of the data is high? May be you can put a trigger in Postgres to send data to the big data cluster as soon as changes are made. Or as I was saying in another email, can the source write to Kafka/Flume/Hbase in addition to Postgres? Sent from Windows Mail *From:* Jeetendra Gangele gangele...@gmail.com *Sent:* Tuesday, July 28, 2015 5:43 AM *To:* santosh...@gmail.com *Cc:* ayan guha guha.a...@gmail.com, felixcheun...@hotmail.com, user@spark.apache.org I trying do that, but there will always data mismatch, since by the time scoop is fetching main database will get so many updates. There is something called incremental data fetch using scoop but that hits a database rather than reading the WAL edit. On 28 July 2015 at 02:52, santosh...@gmail.com wrote: Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail *From:* ayan guha guha.a...@gmail.com *Sent:* Monday, July 27, 2015 4:41 PM *To:* Jeetendra Gangele gangele...@gmail.com *Cc:* felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Actor not found for: ActorSelection
I just cloned the master repository of Spark from Github. I am running it on OSX 10.9, Spark 1.4.1 and Scala 2.10.4 I just tried to run the SparkPi example program using IntelliJ Idea but get the error : akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkMaster@myhost:7077/) I did checkout a similar post http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-td22265.html but found no solution. Find the complete stack trace below. Any help would be really appreciated. 2015-07-28 22:16:45,888 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Running Spark version 1.5.0-SNAPSHOT 2015-07-28 22:16:47,125 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-07-28 22:16:47,753 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing view acls to: mac 2015-07-28 22:16:47,755 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing modify acls to: mac 2015-07-28 22:16:47,756 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(mac); users with modify permissions: Set(mac) 2015-07-28 22:16:49,454 INFO [sparkDriver-akka.actor.default-dispatcher-2] slf4j.Slf4jLogger (Slf4jLogger.scala:applyOrElse(80)) - Slf4jLogger started 2015-07-28 22:16:49,695 INFO [sparkDriver-akka.actor.default-dispatcher-2] Remoting (Slf4jLogger.scala:apply$mcV$sp(74)) - Starting remoting 2015-07-28 22:16:50,167 INFO [sparkDriver-akka.actor.default-dispatcher-2] Remoting (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.2.105:49981] 2015-07-28 22:16:50,215 INFO [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'sparkDriver' on port 49981. 2015-07-28 22:16:50,372 INFO [main] spark.SparkEnv (Logging.scala:logInfo(59)) - Registering MapOutputTracker 2015-07-28 22:16:50,596 INFO [main] spark.SparkEnv (Logging.scala:logInfo(59)) - Registering BlockManagerMaster 2015-07-28 22:16:50,948 INFO [main] storage.DiskBlockManager (Logging.scala:logInfo(59)) - Created local directory at /private/var/folders/8k/jfw576r50m97rlk5qpj1n4l8gn/T/blockmgr-309db4d1-d129-43e5-a90e-12cf51ad491f 2015-07-28 22:16:51,198 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - MemoryStore started with capacity 491.7 MB 2015-07-28 22:16:51,707 INFO [main] spark.HttpFileServer (Logging.scala:logInfo(59)) - HTTP File server directory is /private/var/folders/8k/jfw576r50m97rlk5qpj1n4l8gn/T/spark-f28e24e7-b798-4365-8209-409d8b27ad2f/httpd-ce32c41d-b618-49e9-bec1-f409454f3679 2015-07-28 22:16:51,777 INFO [main] spark.HttpServer (Logging.scala:logInfo(59)) - Starting HTTP Server 2015-07-28 22:16:52,091 INFO [main] server.Server (Server.java:doStart(272)) - jetty-8.1.14.v20131031 2015-07-28 22:16:52,116 INFO [main] server.AbstractConnector (AbstractConnector.java:doStart(338)) - Started SocketConnector@0.0.0.0:49982 2015-07-28 22:16:52,116 INFO [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'HTTP file server' on port 49982. 2015-07-28 22:16:52,249 INFO [main] spark.SparkEnv (Logging.scala:logInfo(59)) - Registering OutputCommitCoordinator 2015-07-28 22:16:54,253 INFO [main] server.Server (Server.java:doStart(272)) - jetty-8.1.14.v20131031 2015-07-28 22:16:54,315 INFO [main] server.AbstractConnector (AbstractConnector.java:doStart(338)) - Started SelectChannelConnector@0.0.0.0:4040 2015-07-28 22:16:54,317 INFO [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'SparkUI' on port 4040. 2015-07-28 22:16:54,386 INFO [main] ui.SparkUI (Logging.scala:logInfo(59)) - Started SparkUI at http://192.168.2.105:4040 2015-07-28 22:16:54,924 WARN [main] metrics.MetricsSystem (Logging.scala:logWarning(71)) - Using default name DAGScheduler for source because spark.app.id is not set. 2015-07-28 22:16:55,132 INFO [appclient-register-master-threadpool-0] client.AppClient$ClientEndpoint (Logging.scala:logInfo(59)) - Connecting to master spark://myhost:7077... 2015-07-28 22:16:55,392 WARN [sparkDriver-akka.actor.default-dispatcher-14] client.AppClient$ClientEndpoint (Logging.scala:logWarning(71)) - Could not connect to myhost:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@myhost:7077] 2015-07-28 22:16:55,412 WARN [sparkDriver-akka.actor.default-dispatcher-14] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkMaster@myhost:7077] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://sparkMaster@myhost:7077]] Caused by: [myhost: unknown error] 2015-07-28 22:16:55,447 WARN [appclient-register-master-threadpool-0] client.AppClient$ClientEndpoint
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
Thanks Corey for your answer, Do you mean that final status : SUCCEEDED in terminal logs means that YARN RM could clean the resources after the application has finished (application finishing does not necessarily mean succeeded or failed) ? With that logic it totally makes sense. Basically the YARN logs does not say anything about the Spark job itself. It just says that Spark job resources have been cleaned up after the job completed and returned back to Yarn. It would be great if Yarn logs could also say about the consequence of the job, because the user is interested in more about the job final status. Yarn related logs can be found in RM ,NM, DN, NN log files in detail. Thanks again. On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote: Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called 15/07/23 16:19:21 INFO util.Utils: Deleting directory /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444 But if look at log files generated for this application in HDFS - it indicates failure of the job with correct reason: *Application log files*: ... \00 stdout\00 179Traceback (most recent call last): File wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) *NameError: name 'nonExistentVariable' is not defined* Why terminal output - final status: *SUCCEEDED , *is not matching application log results - failure of the job (NameError: name 'nonExistentVariable' is not defined) ? Is this bug ? Is there Jira ticket related to this issue ? (Is someone assigned to
DataFrame DAG recomputed even though DataFrame is cached?
Hi, I'm puzzling over the following problem: when I cache a small sample of a big dataframe, the small dataframe is recomputed when selecting a column (but not if show() or count() is invoked). Why is that so and how can I avoid recomputation of the small sample dataframe? More details: - I have a big dataframe df of ~190million rows and ~10 columns, obtained via 3 different joins; I cache it and invoke count() to make sure it really is in memory and confirm in web UI - val sdf = df.sample(false, 1e-6); sdf.cache(); sdf.count() // 170 lines; cached is also confirmed in webUI, size in memory is 150kB *- sdf.select(colname).show() // this triggers a complete recomputation of sdf with 3 joins!* - show(), count() or take() do not trigger the recomputation of the 3 joins, but select(), collect() or withColumn() do. I have --executor-memory 30G --driver-memory 10g, so memory is not a problem. I'm using Spark 1.4.0. Could anybody shed some light on this or where I can find more info? Many thanks, Kristina
Fighting against performance: JDBC RDD badly distributed
Hi all, I am experimenting and learning performance on big tasks locally, with a 32 cores node and more than 64GB of Ram, data is loaded from a database through JDBC driver, and launching heavy computations against it. I am presented with two questions: 1. My RDD is poorly distributed. I am partitioning into 32 pieces, but first 31 pieces are extremely lightweight compared to piece 32 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30). 1419 bytes result sent to driver 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, localhost, PROCESS_LOCAL, 1539 bytes) 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31) 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) in 2798 ms on localhost (31/32) 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, computing it ...All pieces take 3 seconds while last one takes around 15 minutes to compute... Is there anything I can do about this? preferrably without reshufling, i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition column) 2. After long time of processing, sometimes I get OOMs, I fail to find a how-to for fallback and give retries to already persisted data to avoid time. Thanks, Saif
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
This might be an issue with how pyspark propagates the error back to the AM. I'm pretty sure this does not happen for Scala / Java apps. Have you filed a bug? On Tue, Jul 28, 2015 at 11:17 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Thanks Corey for your answer, Do you mean that final status : SUCCEEDED in terminal logs means that YARN RM could clean the resources after the application has finished (application finishing does not necessarily mean succeeded or failed) ? With that logic it totally makes sense. Basically the YARN logs does not say anything about the Spark job itself. It just says that Spark job resources have been cleaned up after the job completed and returned back to Yarn. It would be great if Yarn logs could also say about the consequence of the job, because the user is interested in more about the job final status. Yarn related logs can be found in RM ,NM, DN, NN log files in detail. Thanks again. On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote: Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called 15/07/23 16:19:21 INFO util.Utils: Deleting directory /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444 But if look at log files generated for this application in HDFS - it indicates failure of the job with correct reason: *Application log files*: ... \00 stdout\00 179Traceback (most recent call last): File wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) *NameError: name 'nonExistentVariable' is not defined* Why
Re: Fighting against performance: JDBC RDD badly distributed
Hi Saif, Are you using JdbcRDD directly from Spark? If yes, then the poor distribution could be due to the bound key you used. See the JdbcRDD Scala doc at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD : sql the text of the query. The query must contain two ? placeholders for parameters used to partition the results. E.g. select title, author from books where ? = id and id = ? lowerBound the minimum value of the first placeholder upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) Shenyan On Tue, Jul 28, 2015 at 2:41 PM, saif.a.ell...@wellsfargo.com wrote: Hi all, I am experimenting and learning performance on big tasks locally, with a 32 cores node and more than 64GB of Ram, data is loaded from a database through JDBC driver, and launching heavy computations against it. I am presented with two questions: 1. My RDD is poorly distributed. I am partitioning into 32 pieces, but first 31 pieces are extremely lightweight compared to piece 32 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30). 1419 bytes result sent to driver 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, localhost, PROCESS_LOCAL, 1539 bytes) 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31) 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) in 2798 ms on localhost (31/32) 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, computing it *...All pieces take 3 seconds while last one takes around 15 minutes to compute...* Is there anything I can do about this? preferrably without reshufling, i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition column) 1. After long time of processing, sometimes I get OOMs, I fail to find a how-to for fallback and give retries to already persisted data to avoid time. Thanks, Saif
RE: Fighting against performance: JDBC RDD badly distributed
Thank you for your response Zhen, I am using some vendor specific JDBC driver JAR file (honestly I dont know where it came from). It’s api is NOT like JdbcRDD, instead, more like jdbc from DataFrameReader https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader So I ask two questions now: 1. Will running a query using JdbcRDD prove better than bringing an entire table as DataFrame? I am later on, converting back to RDDs. 2. I lack of some proper criteria to decide a proper column for distributon. My table has more than 400 columns. Saif From: shenyan zhen [mailto:shenya...@gmail.com] Sent: Tuesday, July 28, 2015 4:16 PM To: Ellafi, Saif A. Cc: user@spark.apache.org Subject: Re: Fighting against performance: JDBC RDD badly distributed Hi Saif, Are you using JdbcRDD directly from Spark? If yes, then the poor distribution could be due to the bound key you used. See the JdbcRDD Scala doc at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD: sql the text of the query. The query must contain two ? placeholders for parameters used to partition the results. E.g. select title, author from books where ? = id and id = ? lowerBound the minimum value of the first placeholder upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) Shenyan On Tue, Jul 28, 2015 at 2:41 PM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi all, I am experimenting and learning performance on big tasks locally, with a 32 cores node and more than 64GB of Ram, data is loaded from a database through JDBC driver, and launching heavy computations against it. I am presented with two questions: 1. My RDD is poorly distributed. I am partitioning into 32 pieces, but first 31 pieces are extremely lightweight compared to piece 32 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30). 1419 bytes result sent to driver 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, localhost, PROCESS_LOCAL, 1539 bytes) 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31) 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) in 2798 ms on localhost (31/32) 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, computing it ...All pieces take 3 seconds while last one takes around 15 minutes to compute... Is there anything I can do about this? preferrably without reshufling, i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition column) 2. After long time of processing, sometimes I get OOMs, I fail to find a how-to for fallback and give retries to already persisted data to avoid time. Thanks, Saif
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
BTW this is most probably caused by this line in PythonRunner.scala: System.exit(process.waitFor()) The YARN backend doesn't like applications calling System.exit(). On Tue, Jul 28, 2015 at 12:00 PM, Marcelo Vanzin van...@cloudera.com wrote: This might be an issue with how pyspark propagates the error back to the AM. I'm pretty sure this does not happen for Scala / Java apps. Have you filed a bug? On Tue, Jul 28, 2015 at 11:17 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Thanks Corey for your answer, Do you mean that final status : SUCCEEDED in terminal logs means that YARN RM could clean the resources after the application has finished (application finishing does not necessarily mean succeeded or failed) ? With that logic it totally makes sense. Basically the YARN logs does not say anything about the Spark job itself. It just says that Spark job resources have been cleaned up after the job completed and returned back to Yarn. It would be great if Yarn logs could also say about the consequence of the job, because the user is interested in more about the job final status. Yarn related logs can be found in RM ,NM, DN, NN log files in detail. Thanks again. On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote: Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called 15/07/23 16:19:21 INFO util.Utils: Deleting directory /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444 But if look at log files generated for this application in HDFS - it indicates failure of the job
Re: DataFrame DAG recomputed even though DataFrame is cached?
We will try to address this before Spark 1.5 is released: https://issues.apache.org/jira/browse/SPARK-9141 On Tue, Jul 28, 2015 at 11:50 AM, Kristina Rogale Plazonic kpl...@gmail.com wrote: Hi, I'm puzzling over the following problem: when I cache a small sample of a big dataframe, the small dataframe is recomputed when selecting a column (but not if show() or count() is invoked). Why is that so and how can I avoid recomputation of the small sample dataframe? More details: - I have a big dataframe df of ~190million rows and ~10 columns, obtained via 3 different joins; I cache it and invoke count() to make sure it really is in memory and confirm in web UI - val sdf = df.sample(false, 1e-6); sdf.cache(); sdf.count() // 170 lines; cached is also confirmed in webUI, size in memory is 150kB *- sdf.select(colname).show() // this triggers a complete recomputation of sdf with 3 joins!* - show(), count() or take() do not trigger the recomputation of the 3 joins, but select(), collect() or withColumn() do. I have --executor-memory 30G --driver-memory 10g, so memory is not a problem. I'm using Spark 1.4.0. Could anybody shed some light on this or where I can find more info? Many thanks, Kristina
restart from last successful stage
Is it possible to restart the job from the last successful stage instead of from the beginning? For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a long time and is successful, but the job fails on stage 1, it would be useful to be able to restart from the output of stage 0 instead of from the beginning. Note that I am NOT talking about Spark Streaming, just Spark Core (and DataFrames), not sure if the case would be different with Streaming. Thanks.
Re: Fighting against performance: JDBC RDD badly distributed
Saif, I am guessing but not sure your use case. Are you retrieving the entire table into Spark? If yes, do you have primary key on your table? If also yes, then JdbcRDD should be efficient. DataFrameReader.jdbc gives you more options, again, depends on your use case. Possible for you to describe your objective and show some code snippet? Shenyan On Tue, Jul 28, 2015 at 3:23 PM, saif.a.ell...@wellsfargo.com wrote: Thank you for your response Zhen, I am using some vendor specific JDBC driver JAR file (honestly I dont know where it came from). It’s api is NOT like JdbcRDD, instead, more like jdbc from DataFrameReader https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader So I ask two questions now: 1. Will running a query using JdbcRDD prove better than bringing an entire table as DataFrame? I am later on, converting back to RDDs. 2. I lack of some proper criteria to decide a proper column for distributon. My table has more than 400 columns. Saif *From:* shenyan zhen [mailto:shenya...@gmail.com] *Sent:* Tuesday, July 28, 2015 4:16 PM *To:* Ellafi, Saif A. *Cc:* user@spark.apache.org *Subject:* Re: Fighting against performance: JDBC RDD badly distributed Hi Saif, Are you using JdbcRDD directly from Spark? If yes, then the poor distribution could be due to the bound key you used. See the JdbcRDD Scala doc at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD : *sql* the text of the query. The query must contain two ? placeholders for parameters used to partition the results. E.g. select title, author from books where ? = id and id = ? *lowerBound* the minimum value of the first placeholder *upperBound* the maximum value of the second placeholder The lower and upper bounds are inclusive. *numPartitions * the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) Shenyan On Tue, Jul 28, 2015 at 2:41 PM, saif.a.ell...@wellsfargo.com wrote: Hi all, I am experimenting and learning performance on big tasks locally, with a 32 cores node and more than 64GB of Ram, data is loaded from a database through JDBC driver, and launching heavy computations against it. I am presented with two questions: 1. My RDD is poorly distributed. I am partitioning into 32 pieces, but first 31 pieces are extremely lightweight compared to piece 32 15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30). 1419 bytes result sent to driver 15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, localhost, PROCESS_LOCAL, 1539 bytes) 15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31) 15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) in 2798 ms on localhost (31/32) 15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, computing it *...All pieces take 3 seconds while last one takes around 15 minutes to compute...* Is there anything I can do about this? preferrably without reshufling, i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition column) 2. After long time of processing, sometimes I get OOMs, I fail to find a how-to for fallback and give retries to already persisted data to avoid time. Thanks, Saif
Re: Actor not found for: ActorSelection
The problem was that I was trying to start the example app in standalone cluster mode by passing in *-Dspark.master=spark://myhost:7077* as an argument to the JVM. I launched the example app locally using -*Dspark.master=local* and it worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-for-ActorSelection-tp24035p24037.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
But then how can we get to know if job is making progress in programmatic way (Java) ? Or if job has failed or succeeded ? Is looking to application log files the only way knowing about job final status (failed/succeeded) ? Because when job fails Job History server does not have much info about the job. In my particular case, if there was error before any Job stage started, then job history server will not have useful info about job progress or status. Then how can user track the Spark job status ? I'm launching Spark jobs through SparkLauncher in Java, then it becomes more difficult to know about the job status. On Tue, Jul 28, 2015 at 11:17 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Thanks Corey for your answer, Do you mean that final status : SUCCEEDED in terminal logs means that YARN RM could clean the resources after the application has finished (application finishing does not necessarily mean succeeded or failed) ? With that logic it totally makes sense. Basically the YARN logs does not say anything about the Spark job itself. It just says that Spark job resources have been cleaned up after the job completed and returned back to Yarn. It would be great if Yarn logs could also say about the consequence of the job, because the user is interested in more about the job final status. Yarn related logs can be found in RM ,NM, DN, NN log files in detail. Thanks again. On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote: Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
Thanks a lot for feedback, Marcelo. I've filed a bug just now - SPARK-9416 https://issues.apache.org/jira/browse/SPARK-9416 On Tue, Jul 28, 2015 at 12:14 PM, Marcelo Vanzin van...@cloudera.com wrote: BTW this is most probably caused by this line in PythonRunner.scala: System.exit(process.waitFor()) The YARN backend doesn't like applications calling System.exit(). On Tue, Jul 28, 2015 at 12:00 PM, Marcelo Vanzin van...@cloudera.com wrote: This might be an issue with how pyspark propagates the error back to the AM. I'm pretty sure this does not happen for Scala / Java apps. Have you filed a bug? On Tue, Jul 28, 2015 at 11:17 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Thanks Corey for your answer, Do you mean that final status : SUCCEEDED in terminal logs means that YARN RM could clean the resources after the application has finished (application finishing does not necessarily mean succeeded or failed) ? With that logic it totally makes sense. Basically the YARN logs does not say anything about the Spark job itself. It just says that Spark job resources have been cleaned up after the job completed and returned back to Yarn. It would be great if Yarn logs could also say about the consequence of the job, because the user is interested in more about the job final status. Yarn related logs can be found in RM ,NM, DN, NN log files in detail. Thanks again. On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote: Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
there's a spark-submit.cmd file for windows. Does that work? On 27 Jul 2015, at 21:19, Proust GZ Feng pf...@cn.ibm.commailto:pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Stahlman, finalRDDStorageLevel is the storage level for the final user/item factors. It is not common to set it to StorageLevel.NONE, unless you want to save the factors directly to disk. So if it is NONE, we cannot unpersist the intermediate RDDs (in/out blocks) because the final user/item factors returned are not materialized. Otherwise, we have to recompute from the very beginning (or last checkpoint) when you materialize the final user/item factors. If you need want to have multiple runs, you can try to set finalRDDStorageLevel to MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage collected. Best, Xiangrui On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information
Re: Proper saving/loading of MatrixFactorizationModel
The partitioner is not saved with the RDD. So when you load the model back, we lose the partitioner information. You can call repartition on the user/product factors and then create a new MatrixFactorizationModel object using the repartitioned RDDs. It would be useful to create a utility method for this, e.g., `MatrixFactorizationModel.repartition(num: Int): MatrixFactorizationModel`. -Xiangrui On Wed, Jul 22, 2015 at 4:34 AM, PShestov pshes...@nvidia.com wrote: Hi all! I have MatrixFactorizationModel object. If I'm trying to recommend products to single user right after constructing model through ALS.train(...) then it takes 300ms (for my data and hardware). But if I save model to disk and load it back then recommendation takes almost 2000ms. Also Spark warns: 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow. How can I create/set partitioner and cache user and product factors after loading model? Following approach didn't help: model.userFeatures().cache(); model.productFeatures().cache(); Also I was trying to repartition those rdds and create new model from repartitioned versions but that also didn't help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NO Cygwin Support in bin/spark-class in Spark 1.4.0
It wasn't removed, but rewritten. Cygwin is just a distribution of POSIX-related utilities so you should be able to use the normal .sh scripts. In any event, you didn't say what the problem is? On Tue, Jul 28, 2015 at 5:19 AM, Proust GZ Feng pf...@cn.ibm.com wrote: Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org