Windowed stream operations -- These are too lazy for some use cases
We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once. The basic idea is this: val mergedMain = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help.
Re: How to list all dataframes and RDDs available in current session?
Apologies I accidentally included Spark User DL on BCC. The actual email message is below. = Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com wrote: Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
Run scala code with spark submit
Is there any possibility to run standalone scala program via spark submit? Or I have always put it in some packages, build it with maven (or sbt)? What if I have just simple program, like that example word counter? Could anyone please, show it on this simple test file Greeting.scala: It comiles with scalac, runs with scala. Now I want to run in with spark (I can get these files via wget, for example) So, how I can run via spark-submit one-filer scala program? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Run scala code with spark submit
I haven't tried it, but scala-shell should work if you give it a scala script file, since it's basically a wrapper around the Scala REPL. dean On Thursday, August 20, 2015, MasterSergius master.serg...@gmail.com wrote: Is there any possibility to run standalone scala program via spark submit? Or I have always put it in some packages, build it with maven (or sbt)? What if I have just simple program, like that example word counter? Could anyone please, show it on this simple test file Greeting.scala: It comiles with scalac, runs with scala. Now I want to run in with spark (I can get these files via wget, for example) So, how I can run via spark-submit one-filer scala program? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; -- Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: insert overwrite table phonesall in spark-sql resulted in java.io.StreamCorruptedException
The answer is that my table was not serialized by kyro,but I started spark-sql shell with kyro,so the data could not be deserialized。 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/insert-overwrite-table-phonesall-in-spark-sql-resulted-in-java-io-StreamCorruptedException-tp23579p24354.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: persist for DStream
Are you asking for something more than this? http://spark.apache.org/docs/latest/streaming-programming-guide.html#caching--persistence On Thu, Aug 20, 2015 at 2:09 PM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, there are function available tp cache() or persist() RDD in memory but i am reading data from kafka in form of DStream and applying operation it and i want to persist that DStream in memory for further. Please suggest method how i can persist DStream in memory. Regards, Deepesh
Transformation not happening for reduceByKey or GroupByKey
HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
How to get the radius of clusters in spark K means
We can get cluster centers in K means clustering. Like wise is there any method in spark to get the cluster radius? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-radius-of-clusters-in-spark-K-means-tp24353.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL concerning materials
Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
persist for DStream
Hi, there are function available tp cache() or persist() RDD in memory but i am reading data from kafka in form of DStream and applying operation it and i want to persist that DStream in memory for further. Please suggest method how i can persist DStream in memory. Regards, Deepesh
Re: spark kafka partitioning
If you have 1 topic, that means you have 1 DStream, which will have a series of RDDs for each batch interval. In receiver-based integration, there is no direct relationship b/w Kafka paritions with spark partitions. in Direct approach, 1 partition will be created for each kafka partition. On Fri, Aug 21, 2015 at 12:48 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with 3 partitions and listens on kafkareceiver with local value[4] . how will i come to know in Spark Streaming that different Dstreams are created according to partitions of kafka messages . Thanks -- Best Regards, Ayan Guha
Spark-Cassandra-connector
Hi All, I need to write an RDD to Cassandra using the sparkCassandraConnector from DataStax. My application is using Yarn. *Some basic Questions :* 1. Will a call to saveToCassandra(.), be using the same connection object between all task in a given executor? I mean is there 1 (one) connection object per executor, that is shared between tasks ? 2. If the above answer is YES, is there a way to create a connectionPool for each executor, so that multiple task can dump data to cassandra in parallel? Regards, Samya -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-tp24378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3. Insert into hive parquet partitioned table from DataFrame
Hi. I have a dataframe and I want to insert these data into parquet partitioned table in Hive. In Spark 1.4 I can use df.write.partitionBy(x,y).format(parquet).mode(append).saveAsTable(tbl_parquet) but in Spark 1.3 I can't. How can I do it? Thanks -- Regards Miguel
Re: How to overwrite partition when writing Parquet?
Cheng - what if I want to overwrite a specific partition? I'll to remove the folder, as Hemant suggested... On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian lian.cs@gmail.com wrote: You can apply a filter first to filter out data of needed dates and then append them. Cheng On 8/20/15 4:59 PM, Hemant Bhanawat wrote: How can I overwrite only a given partition or manually remove a partition before writing? I don't know if (and I don't think) there is a way to do that using a mode. But doesn't manually deleting the directory of a particular partition help? For directory structure, check this out... http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
Re: How to add a new column with date duration from 2 date columns in a dataframe
Apologies, sent too early accidentally. Actual message is below A dataframe has 2 datecolumns (datetime type) and I would like to add another column that would have difference between these two dates. Dataframe snippet is below. new_df.show(5) +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ Here is what I have tried so far: - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE)).show() Error: DateType does not support numeric operations - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() Error: Can't extract value from (next_diag_date#927 - SVCDATE#377); However this simple python code works fine with pySpark: from datetime import date d0 = date(2008, 8, 18) d1 = date(2008, 9, 26) delta = d0 - d1 print (d0 - d1).days # -39 Any suggestions would be appreciated! Also is there a way to add a new column in dataframe without using column expression (e.g. like in pandas or R. df$new_col = 'new col value')? Thanks, Dhaval On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote: new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+
Re: How to overwrite partition when writing Parquet?
You can apply a filter first to filter out data of needed dates and then append them. Cheng On 8/20/15 4:59 PM, Hemant Bhanawat wrote: How can I overwrite only a given partition or manually remove a partition before writing? I don't know if (and I don't think) there is a way to do that using a mode. But doesn't manually deleting the directory of a particular partition help? For directory structure, check this out... http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com mailto:r...@totango.com wrote: Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
[SparkR] How to perform a for loop on a DataFrame object
Hi guys, First of all, thank you for your amazing work. As you can see in the subject, I post here because I need to perform a for loop on a DataFrame object. Sample of my Dataset (the entire dataset is ~400k lines long) : I use the 1.4.1 Spark version with R in 3.2.1 I launch sparkR using (the package can be found at http://spark-packages.org/package/databricks/spark-csv ) I load my dataset from HDFS using the following command (the package is needed to load a CSV in a Spark DataFrame): When I do a summary, the output is : What I need to do is to calculate : But you probably know that we can't do this because the read.df function return an S4 object and it is not an iterable object. Does anyone know how can I do that ? Maybe I have to convert the type of the DataFrame or use another function to load my dataset... I have to say that I'm new to Spark and SparkR :) Thanks for your time, Florian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-How-to-perform-a-for-loop-on-a-DataFrame-object-tp24359.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert mllib.linalg.Matrix to Breeze
Hi All, Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any leads are appreciated. Thanks, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert mllib.linalg.Matrix to Breeze
You can use Matrix.toBreeze() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56 . 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com: Hi All, Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any leads are appreciated. Thanks, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrameWriter.jdbc is very slow
We want to migrate our data (approximately 20M rows) from parquet to postgres, when we are using dataframe writer's jdbc method the execution time is very large, we have tried the same with batch insert it was much effective. Is it intentionally implemented in that way?
How to add a new column with date duration from 2 date columns in a dataframe
new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+
Data locality with HDFS not being seen
Hello . I am seeing some unexpected issues with achieving HDFS data locality. I expect the tasks to be executed only on the node which has the data but this is not happening (ofcourse, unless the node is busy in which case, I understand tasks can go to some other node). Could anyone clarify whats wrong with the way I am trying or what I should rather do? Below is the cluster configuration and experiments that I have tried. Any help will be appreciated. If you would like to recreate the below scenario, then you may use the JavaWordCount.java example given within the spark. *Cluster configuration:* 1. spark-1.4.0 and hadoop-2.7.1 2. Machines -- Master node (master) and 6 worker nodes (node1 to node6) 3. master acts as -- spark master, HDFS name node sec name node, Yarn resource manager 4. Each of the 6 worker nodes act as -- spark worker node, HDFS data node, node manager *Data on HDFS:* 20Mb text file is stored in single block. With the replication factor of 3, the text file is stored on nodes 2, 3 4. *Test-1 (Spark stand alone mode):* Application being run is the standard Java word count count example with the above text file in HDFS, as input. On job submission, I see in the spark web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e. node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on the very first worker node that gets registered to the application (this can be looked from the event timeline graph). Rather, I am expecting the stage-0 to be run only on any one of the three nodes 2, 3, or 4. * Test-2 (Yarn cluster mode): * Same as above. No data locality seen. * Additional info: * No other spark applications are running and I have even tried by setting the /spark.locality.wait/ to 10s, but still no difference. Thanks and regards, Sunil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Any suggestion about sendMessageReliably failed because ack was not received within 120 sec
The closed information I can found online related to this error ishttps://issues.apache.org/jira/browse/SPARK-3633 But it is quite different in our case. In our case, we never saw the (Too many open files) error, the log just simple show the 120 sec time out. I checked all the GC output from all 42 executors, the max full gc real=11.79 secs is what I can find, way less than 120 seconds time out. From 42 executors, there is on executor's stdout/stderr page hangs, I cannot see any gc or log information for this executor, but it is shown as LOADING in the master page, and I think the reason is just the WorkerUI cannot bind to 8081 somehow during the boot time, and bind to 8082 instead, master UI didn't catch that information. Anyway, my only option now is to increase the timeout of both spark.core.connection.ack.wait.timeout and spark.akka.timeout to 600, as suggested in the jira, and will report back what I find later. This same daily job runs about 12 hours in the Hive/MR, and can finish about 4 hours in Spark (with 25% allocated cluster resource). On this point, Spark is faster and great, but IF (big IF) every tasks run smoothly. In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In Spark, in this case, it does retry the failed partitions only, but we saw 4 or 5 times retry sometimes, make it in fact much much slower. Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: Any suggestion about sendMessageReliably failed because ack was not received within 120 sec Date: Thu, 20 Aug 2015 20:49:52 -0400 Hi, Sparkers: After first 2 weeks of Spark in our production cluster, with more familiar with Spark, we are more confident to avoid Lost Executor due to memory issue. So far, most of our jobs won't fail or slow down due to Lost executor. But sometimes, I observed that individual tasks failed due to sendMessageReliably failed because ack was not received within 120 sec. Here is the basic information: Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We are conservative for the Spark jobs, with low number of cores + big parallelism/partitions to control the memory usage in the job, so far we are happen to avoid lost executor. We have one big daily job is running with following configuration: /opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 --executor-memory 20G --total-executor-cores 168 --conf spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 --conf spark.default.parallelism=6000 --conf spark.shuffle.blockTransferService=nio -i spark.script 168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no cache needed, so I make the storage memoryFraction very lownio is much robust than netty in our experience For this big daily job generating over 2 of tasks, they all could finish without this issue, but sometimes, for the same job, tasks keep failing due to this error and retry. But even in this case, I saw the task failed due to this error and retry. Retry maybe part of life for distribute environment, but I want to know what root cause could behind it and how to avoid it. Do I increase spark.core.connection.ack.wait.timeout to fix this error? When this happened, I saw there is no executor lost, all are alive. Below is the message in the log, for example, it complained about timeout to connect to host-121. FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed because ack was not received within 120 sec at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
RE: SparkR csv without headers
Hi, You can create a DataFrame using load.df() with a specified schema. Something like: schema - structType(structField(“a”, “string”), structField(“b”, integer), …) read.df ( …, schema = schema) From: Franc Carter [mailto:franc.car...@rozettatech.com] Sent: Wednesday, August 19, 2015 1:48 PM To: user@spark.apache.org Subject: SparkR csv without headers Hi, Does anyone have an example of how to create a DataFrame in SparkR which specifies the column names - the csv files I have do not have column names in the first row. I can get read a csv nicely with com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 etc thanks -- Franc Carter I Systems ArchitectI RoZetta Technology [Description: Description: Description: cid:image003.jpg@01D02903.9B540580] L4. 55 Harrington Street, THE ROCKS, NSW, 2000 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA T +61 2 8355 2515tel:%2B61%202%208355%202515 I www.rozettatechnology.comhttp://www.rozettatechnology.com/ [cid:image002.jpg@01D02903.0B41B280] DISCLAIMER: The contents of this email, inclusive of attachments, may be legally privileged and confidential. Any unauthorised use of the contents is expressly prohibited.
Any suggestion about sendMessageReliably failed because ack was not received within 120 sec
Hi, Sparkers: After first 2 weeks of Spark in our production cluster, with more familiar with Spark, we are more confident to avoid Lost Executor due to memory issue. So far, most of our jobs won't fail or slow down due to Lost executor. But sometimes, I observed that individual tasks failed due to sendMessageReliably failed because ack was not received within 120 sec. Here is the basic information: Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We are conservative for the Spark jobs, with low number of cores + big parallelism/partitions to control the memory usage in the job, so far we are happen to avoid lost executor. We have one big daily job is running with following configuration: /opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 --executor-memory 20G --total-executor-cores 168 --conf spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 --conf spark.default.parallelism=6000 --conf spark.shuffle.blockTransferService=nio -i spark.script 168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no cache needed, so I make the storage memoryFraction very lownio is much robust than netty in our experience For this big daily job generating over 2 of tasks, they all could finish without this issue, but sometimes, for the same job, tasks keep failing due to this error and retry. But even in this case, I saw the task failed due to this error and retry. Retry maybe part of life for distribute environment, but I want to know what root cause could behind it and how to avoid it. Do I increase spark.core.connection.ack.wait.timeout to fix this error? When this happened, I saw there is no executor lost, all are alive. Below is the message in the log, for example, it complained about timeout to connect to host-121. FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed because ack was not received within 120 sec at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: sendMessageReliably failed because ack was not received within 120 sec at org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929) at org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928) at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) at
Re: Kafka Spark Partition Mapping
In general you cannot guarantee which node an RDD will be processed on. The preferred location for a kafkardd is the kafka leader for that partition, if they're deployed on the same machine. If you want to try to override that behavior, the method is getPreferredLocations But even in that case, location preferences are just a scheduler hint, the rdd can still be scheduled elsewhere. You can turn up spark.locality.wait to a very high value to decrease the likelihood. On Thu, Aug 20, 2015 at 5:47 PM, nehalsyed nehal_s...@cable.comcast.com wrote: I have data in Kafka topic-partition and I am reading it from Spark like this: JavaPairReceiverInputDStreamString, String directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]); I want that message from a kafka partition always land on same machine on Spark rdd so I can cache some decoration data locally and later reuse with other messages (that belong to same key). Can anyone tell me how can I achieve it? Thanks -- View this message in context: Kafka Spark Partition Mapping http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Partition-Mapping-tp24372.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: what determine the task size?
cwz wrote sorry, my question is not clear. I mean what determine the one task size? not how many tasks one task size= one HDFS block size. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24375.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark kafka partitioning
Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with 3 partitions and listens on kafkareceiver with local value[4] . how will i come to know in Spark Streaming that different Dstreams are created according to partitions of kafka messages . Thanks
Re: How to list all dataframes and RDDs available in current session?
I am not sure if you can view all RDDs in a session. Tables are maintained in a catalogue . Hence its easier. However you can see the DAG representation , which lists all the RDDs in a job , with Spark UI. On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote: Apologies I accidentally included Spark User DL on BCC. The actual email message is below. = Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com wrote: Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
Re: Windowed stream operations -- These are too lazy for some use cases
On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote: We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once. The basic idea is this: val mergedMain = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches Could you add a dummy action at this point? val firstStep = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches .persist() // this will be reused in windowing operations firstStep.count() // just to trigger computation firstStep .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help. -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: DAG related query
Hi Bahubali, Once RDDs are created, they are immutable (in most cases). In your case you end up with 3 RDDs: (1) the original rdd1 that reads from the text file (2) rdd2, that applies a map function on (1), and (3) the new rdd1 that applies a map function on (2) There's no cycle because you have 3 distinct RDDs. All you're doing is reassigning a reference `rdd1`, but the underlying RDD doesn't change. -Andrew 2015-08-20 6:21 GMT-07:00 Sean Owen so...@cloudera.com: No. The third line creates a third RDD whose reference simply replaces the reference to the first RDD in your local driver program. The first RDD still exists. On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain bahub...@gmail.com wrote: Hi, How would the DAG look like for the below code JavaRDDString rdd1 = context.textFile(SOMEPATH); JavaRDDString rdd2 = rdd1.map(DO something); rdd1 = rdd2.map(Do SOMETHING); Does this lead to any kind of cycle? Thanks, Baahu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Windowed stream operations -- These are too lazy for some use cases
I tried something like that. When I tried just doing count() on the DStream, it didn't seem like it was actually forcing the computation. What (sort of) worked was doing a forEachRDD((rdd) = rdd.count()), or doing a print() on the DStream. The only problem was this seemed to add a lot of processing overhead -- I couldn't figure out exactly why but it seemed to have something to do with forEachRDD only being executed on the driver. On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote: We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once. The basic idea is this: val mergedMain = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches Could you add a dummy action at this point? val firstStep = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches .persist() // this will be reused in windowing operations firstStep.count() // just to trigger computation firstStep .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help. -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: How to add a new column with date duration from 2 date columns in a dataframe
As Aram said, there two options in Spark 1.4, 1) Use the HiveContext, then you got datediff from Hive, df.selectExpr(datediff(d2, d1)) 2) Use Python UDF: ``` from datetime import date df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9, 26))], ['d1', 'd2']) from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType diff = udf(lambda a, b: (a - b).days, IntegerType()) df.select(diff(df.d1, df.d2)).show() +-+ |PythonUDF#lambda(d1,d2)| +-+ | -39| +-+ ``` On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Hi, hope this will help you import org.apache.spark.sql.functions._ import sqlContext.implicits._ import java.sql.Timestamp val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2) val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) = Days.daysBetween(new DateTime(value2.getTime), new DateTime(value1.getTime)).getDays) df.withColumn(diff, dateDiff(df(day2), df(day1))).show() or you can write sql query using hiveql's datediff function. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote: More update on this question..I am using spark 1.4.1. I was just reading documentation of spark 1.5 (still in development) and I think there will be a new func *datediff* that will solve the issue. So please let me know if there is any work-around until spark 1.5 is out :). pyspark.sql.functions.datediff(end, start)[source] Returns the number of days from start to end. df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) df.select(datediff(df.d2, df.d1).alias('diff')).collect() [Row(diff=32)] New in version 1.5. On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com wrote: Apologies, sent too early accidentally. Actual message is below A dataframe has 2 datecolumns (datetime type) and I would like to add another column that would have difference between these two dates. Dataframe snippet is below. new_df.show(5) +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ Here is what I have tried so far: - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE)).show() Error: DateType does not support numeric operations - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() Error: Can't extract value from (next_diag_date#927 - SVCDATE#377); However this simple python code works fine with pySpark: from datetime import date d0 = date(2008, 8, 18) d1 = date(2008, 9, 26) delta = d0 - d1 print (d0 - d1).days # -39 Any suggestions would be appreciated! Also is there a way to add a new column in dataframe without using column expression (e.g. like in pandas or R. df$new_col = 'new col value')? Thanks, Dhaval On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote: new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?
I'm planning to close the survey to further responses early next week. If you haven't chimed in yet, the link to the survey is here: http://goo.gl/forms/erct2s6KRR We already have some great responses, which you can view. I'll share a summary after the survey is closed. Cheers! Nick On Mon, Aug 17, 2015 at 11:09 AM Nicholas Chammas nicholas.cham...@gmail.com wrote: Howdy folks! I’m interested in hearing about what people think of spark-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the formal JIRA process. Your answers will all be anonymous and public. If the embedded form below doesn’t work for you, you can use this link to get the same survey: http://goo.gl/forms/erct2s6KRR Cheers! Nick
Re: How to add a new column with date duration from 2 date columns in a dataframe
More update on this question..I am using spark 1.4.1. I was just reading documentation of spark 1.5 (still in development) and I think there will be a new func *datediff* that will solve the issue. So please let me know if there is any work-around until spark 1.5 is out :). pyspark.sql.functions.datediff(*end*, *start*)[source] http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/_modules/pyspark/sql/functions.html#datediff http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/pyspark.sql.html#pyspark.sql.functions.datediff Returns the number of days from start to end. df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) df.select(datediff(df.d2, df.d1).alias('diff')).collect()[Row(diff=32)] New in version 1.5. On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com wrote: Apologies, sent too early accidentally. Actual message is below A dataframe has 2 datecolumns (datetime type) and I would like to add another column that would have difference between these two dates. Dataframe snippet is below. new_df.show(5) +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ Here is what I have tried so far: - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE)).show() Error: DateType does not support numeric operations - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() Error: Can't extract value from (next_diag_date#927 - SVCDATE#377); However this simple python code works fine with pySpark: from datetime import date d0 = date(2008, 8, 18) d1 = date(2008, 9, 26) delta = d0 - d1 print (d0 - d1).days # -39 Any suggestions would be appreciated! Also is there a way to add a new column in dataframe without using column expression (e.g. like in pandas or R. df$new_col = 'new col value')? Thanks, Dhaval On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote: new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+
Re: DAG related query
No. The third line creates a third RDD whose reference simply replaces the reference to the first RDD in your local driver program. The first RDD still exists. On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain bahub...@gmail.com wrote: Hi, How would the DAG look like for the below code JavaRDDString rdd1 = context.textFile(SOMEPATH); JavaRDDString rdd2 = rdd1.map(DO something); rdd1 = rdd2.map(Do SOMETHING); Does this lead to any kind of cycle? Thanks, Baahu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark concurrent jobs using single SparkContext
Hi all, We're using Spark 1.3.0 via a small YARN cluster to do some log processing. The jobs are pretty simple, for a number of customers and a number of days, fetch some event log data, build aggregates and store those aggregates into a data store. The way our script is written right now does something akin to: with SparkContext() as sc: for customer in customers: for day in days: logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) # This function contains the action saveAsNewAPIHadoopFile which # triggers a save save_aggregate(aggregate) So we have a Spark job per customer, per day. I tried doing some parallel job submission with something similar to: def make_and_save_aggregate(customer, day, spark_context): # Without a separate threading.Lock() here or better yet, one guarding the # Spark context, multiple customer/day transformations and actions could # be interweaved sc = spark_context logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) save_aggregate(aggregate) with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor: for customer in customers: for day in days: executor.submit(make_and_save_aggregate, customer, day, sc) The problem is, with no locks on a SparkContext except during initialization https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241 and shutdown https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307, operations on the context could (if I understand correctly) be interweaved leading to DAG which contains transformations out of order and from different customer, day periods. One solution is instead to launch multiple Spark jobs via spark-submit and let YARN/Spark's dynamic executor allocation take care of fair scheduling. In practice, this doesn't seem to yield very fast computation perhaps due to some additional overhead with YARN. Is there any safe way to launch concurrent jobs like this using a single PySpark context? -- Mike Sukmanowsky Aspiring Digital Carpenter *e*: mike.sukmanow...@gmail.com LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github https://github.com/msukmanowsky
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data
Hi Hemant sorry for the confusion I meant final output part files in the final directory hdfs I never meant intermediate files. Thanks. My goal is to reduce those many files because of my use case explained in the first email with calculations. On Aug 20, 2015 5:59 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Sorry, I misread your mail. Thanks for pointing that out. BTW, are the 8 files shuffle intermediate output and not the final output? I assume yes. I didn't know that you can keep intermediate output on HDFS and I don't think that is recommended. On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Looks like you are using hash based shuffling and not sort based shuffling which creates a single file per maptask. On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have a Spark job which deals with large skewed dataset. I have around 1000 Hive partitions to process in four different tables every day. So if I go with 200 spark.sql.shuffle.partitions default partitions created by Spark I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont be good for HDFS name node I have been told if you keep on creating such large no of small small files namenode will crash is it true? please help me understand. Anyways so to avoid creating small files I did set spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and as per my understanding because of only one output there is so much shuffling to do to bring all data to once reducer please correct me if I am wrong. This is causing memory/timeout issues how do I deal with it I tried to give spark.shuffle.storage=0.7 also still this memory seems not enough for it. I have 25 gb executor with 4 cores and 20 such executors still Spark job fails please guide. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL concerning materials
Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
Moving this back onto user@ Regarding GC, can you look in the web UI and see whether the GC time metric dominates the amount of time spent on each task (or at least the tasks that aren't completing)? Also, have you tried bumping your spark.yarn.executor.memoryOverhead? YARN may be killing your executors for using too much off-heap space. You can see whether this is happening by looking in the Spark AM or YARN NodeManager logs. -Sandy On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks much for the response. Yes I tried default settings too 0.2 it was also going into timeout if it is spending time in GC then why it is not throwing GC error I don't see any such error. Yarn logs are not helpful at all. What is tungsten how do I use it? Spark is doing great I believe my job runs successfully and 60% tasks completes only after first executor gets lost things are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What sounds most likely is that you're hitting heavy garbage collection. Did you hit issues when the shuffle memory fraction was at its default of 0.2? A potential danger with setting the shuffle storage to 0.7 is that it allows shuffle objects to get into the GC old generation, which triggers more stop-the-world garbage collections. Have you tried enabling Tungsten / unsafe? Unfortunately, Spark is still not that great at dealing with heavily-skewed shuffle data, because its reduce-side aggregation still operates on Java objects instead of binary data. -Sandy On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries executed using hiveContext.sql my data set is skewed so will be more shuffling I believe I don't know what's wrong spark job runs fine for almost an hour and when shuffle read shuffle write column in UI starts to show more than 10 gb executor starts to getting lost because of timeout and slowly other executor starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert mllib.linalg.Matrix to Breeze
Matrix.toBreeze is a private method. MLlib matrices have the same structure as Breeze Matrices. Just create a new Breeze matrix like this https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270 . Best, Burak On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote: You can use Matrix.toBreeze() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56 . 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com: Hi All, Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any leads are appreciated. Thanks, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Data frame created from hive table and its partition
Hi I have a question regarding data frame partition. I read a hive table from spark and following spark api converts it as DF. test_df = sqlContext.sql(“select * from hivetable1”) How does spark decide partition of test_df? Is there a way to partition test_df based on some column while reading hive table? Second question is, if that hive table has primary key declared, does spark honor PK in hive table and partition based on PKs? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Could anybody let me know what is that i missing here, it should work as its a basic transformation Please let me know if any additional information required Regards, Satish On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: How to add a new column with date duration from 2 date columns in a dataframe
Hi, hope this will help you import org.apache.spark.sql.functions._ import sqlContext.implicits._ import java.sql.Timestamp val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2) val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) = Days.daysBetween(new DateTime(value2.getTime), new DateTime(value1.getTime)).getDays) df.withColumn(diff, dateDiff(df(day2), df(day1))).show() or you can write sql query using hiveql's datediff function. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote: More update on this question..I am using spark 1.4.1. I was just reading documentation of spark 1.5 (still in development) and I think there will be a new func *datediff* that will solve the issue. So please let me know if there is any work-around until spark 1.5 is out :). pyspark.sql.functions.datediff(*end*, *start*)[source] http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/_modules/pyspark/sql/functions.html#datediff http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/pyspark.sql.html#pyspark.sql.functions.datediff Returns the number of days from start to end. df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) df.select(datediff(df.d2, df.d1).alias('diff')).collect()[Row(diff=32)] New in version 1.5. On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com wrote: Apologies, sent too early accidentally. Actual message is below A dataframe has 2 datecolumns (datetime type) and I would like to add another column that would have difference between these two dates. Dataframe snippet is below. new_df.show(5) +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+ Here is what I have tried so far: - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE)).show() Error: DateType does not support numeric operations - new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() Error: Can't extract value from (next_diag_date#927 - SVCDATE#377); However this simple python code works fine with pySpark: from datetime import date d0 = date(2008, 8, 18) d1 = date(2008, 9, 26) delta = d0 - d1 print (d0 - d1).days # -39 Any suggestions would be appreciated! Also is there a way to add a new column in dataframe without using column expression (e.g. like in pandas or R. df$new_col = 'new col value')? Thanks, Dhaval On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote: new_df.withColumn('SVCDATE2', (new_df.next_diag_date-new_df.SVCDATE).days).show() +---+--+--+ | PATID| SVCDATE|next_diag_date| +---+--+--+ |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27| +---+--+--+
DAG related query
Hi, How would the DAG look like for the below code JavaRDDString rdd1 = context.textFile(SOMEPATH); JavaRDDString rdd2 = rdd1.map(DO something); rdd1 = rdd2.map(Do SOMETHING); Does this lead to any kind of cycle? Thanks, Baahu
Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data
Sorry, I misread your mail. Thanks for pointing that out. BTW, are the 8 files shuffle intermediate output and not the final output? I assume yes. I didn't know that you can keep intermediate output on HDFS and I don't think that is recommended. On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Looks like you are using hash based shuffling and not sort based shuffling which creates a single file per maptask. On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have a Spark job which deals with large skewed dataset. I have around 1000 Hive partitions to process in four different tables every day. So if I go with 200 spark.sql.shuffle.partitions default partitions created by Spark I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont be good for HDFS name node I have been told if you keep on creating such large no of small small files namenode will crash is it true? please help me understand. Anyways so to avoid creating small files I did set spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and as per my understanding because of only one output there is so much shuffling to do to bring all data to once reducer please correct me if I am wrong. This is causing memory/timeout issues how do I deal with it I tried to give spark.shuffle.storage=0.7 also still this memory seems not enough for it. I have 25 gb executor with 4 cores and 20 such executors still Spark job fails please guide. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert mllib.linalg.Matrix to Breeze
Hi, Thanks for the reply. I tried Matrix.toBreeze() which returns the following error: */method toBreeze in trait Matrix cannot be accessed in org.apache.spark.mllib.linalg.Matrix/* On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote: Matrix.toBreeze is a private method. MLlib matrices have the same structure as Breeze Matrices. Just create a new Breeze matrix like this https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270. Best, Burak On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com mailto:yblia...@gmail.com wrote: You can use Matrix.toBreeze() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56 . 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com mailto:nav...@formcept.com: Hi All, Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any leads are appreciated. Thanks, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: SparkR - can't create spark context - JVM not ready
Thanks Shivaram. You got me wondering about the path so I put it in full and it worked. R does not, of course, expand a ~. On Thu, Aug 20, 2015 at 4:35 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Can you check if the file `~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit` exists ? The error message seems to indicate it is trying to pick up Spark from that location and can't seem to find Spark installed there. Thanks Shivaram On Thu, Aug 20, 2015 at 3:30 PM, Deborah Siegel deborah.sie...@gmail.com wrote: Hello, I have previously successfully run SparkR in RStudio, with: Sys.setenv(SPARK_HOME=~/software/spark-1.4.1-bin-hadoop2.4) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local[2],appName=SparkR-example) Then I tried putting some of it into an .Rprofile. It seemed to work to load the paths and SparkR, but I got an error when trying to create the sc. I then removed my .Rprofile, as well as .rstudio-desktop. However, I still cannot create the sc. Here is the error sc - sparkR.init(master=local[2],appName=SparkR-example) Launching java with spark-submit command ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit sparkr-shell /var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file or directory Error in sparkR.init(master = local[2], appName = SparkR-example) : JVM is not ready after 10 seconds I suspected there was an incomplete process or something. I checked for any running R or Java processes and there were none. Has someone seen this type of error? I have the same error in both RStudio and in R shell (but not sparkR wrapper). Thanks, Deb
Re: spark kafka partitioning
I'm not clear on your question, can you rephrase it? Also, are you talking about createStream or createDirectStream? On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with 3 partitions and listens on kafkareceiver with local value[4] . how will i come to know in Spark Streaming that different Dstreams are created according to partitions of kafka messages . Thanks
Re: SparkR csv without headers
Thanks - works nicely cheers On Fri, Aug 21, 2015 at 12:43 PM, Sun, Rui rui@intel.com wrote: Hi, You can create a DataFrame using load.df() with a specified schema. Something like: schema - structType(structField(“a”, “string”), structField(“b”, integer), …) read.df ( …, schema = schema) *From:* Franc Carter [mailto:franc.car...@rozettatech.com] *Sent:* Wednesday, August 19, 2015 1:48 PM *To:* user@spark.apache.org *Subject:* SparkR csv without headers Hi, Does anyone have an example of how to create a DataFrame in SparkR which specifies the column names - the csv files I have do not have column names in the first row. I can get read a csv nicely with com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 etc thanks -- *Franc Carter* I Systems ArchitectI RoZetta Technology [image: Description: Description: Description: cid:image003.jpg@01D02903.9B540580] L4. 55 Harrington Street, THE ROCKS, NSW, 2000 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA *T* +61 2 8355 2515 Iwww.rozettatechnology.com [image: cid:image002.jpg@01D02903.0B41B280] DISCLAIMER: The contents of this email, inclusive of attachments, may be legally privileged and confidential. Any unauthorised use of the contents is expressly prohibited. -- *Franc Carter* I Systems ArchitectI RoZetta Technology [image: Description: Description: Description: cid:image003.jpg@01D02903.9B540580] L4. 55 Harrington Street, THE ROCKS, NSW, 2000 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA *T* +61 2 8355 2515 Iwww.rozettatechnology.com [image: cid:image002.jpg@01D02903.0B41B280] DISCLAIMER: The contents of this email, inclusive of attachments, may be legally privileged and confidential. Any unauthorised use of the contents is expressly prohibited.
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
GC wouldn't necessarily result in errors - it could just be slowing down your job and causing the executor JVMs to stall. If you click on a stage in the UI, you should end up on a page with all the metrics concerning the tasks that ran in that stage. GC Time is one of these task metrics. -Sandy On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead as 3500 which seems to be good enough I believe. So you mean only GC could be the reason behind timeout I checked Yarn logs I did not see any GC error there. Please guide. Thanks much. On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Moving this back onto user@ Regarding GC, can you look in the web UI and see whether the GC time metric dominates the amount of time spent on each task (or at least the tasks that aren't completing)? Also, have you tried bumping your spark.yarn.executor.memoryOverhead? YARN may be killing your executors for using too much off-heap space. You can see whether this is happening by looking in the Spark AM or YARN NodeManager logs. -Sandy On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks much for the response. Yes I tried default settings too 0.2 it was also going into timeout if it is spending time in GC then why it is not throwing GC error I don't see any such error. Yarn logs are not helpful at all. What is tungsten how do I use it? Spark is doing great I believe my job runs successfully and 60% tasks completes only after first executor gets lost things are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What sounds most likely is that you're hitting heavy garbage collection. Did you hit issues when the shuffle memory fraction was at its default of 0.2? A potential danger with setting the shuffle storage to 0.7 is that it allows shuffle objects to get into the GC old generation, which triggers more stop-the-world garbage collections. Have you tried enabling Tungsten / unsafe? Unfortunately, Spark is still not that great at dealing with heavily-skewed shuffle data, because its reduce-side aggregation still operates on Java objects instead of binary data. -Sandy On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries executed using hiveContext.sql my data set is skewed so will be more shuffling I believe I don't know what's wrong spark job runs fine for almost an hour and when shuffle read shuffle write column in UI starts to show more than 10 gb executor starts to getting lost because of timeout and slowly other executor starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating Spark DataFrame from large pandas DataFrame
If you would like to try using spark-csv, please use `pyspark --packages com.databricks:spark-csv_2.11:1.2.0` You're missing a dependency. Best, Burak On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack charles.t.h...@gmail.com wrote: Hi, I'm new to spark and am trying to create a Spark df from a pandas df with ~5 million rows. Using Spark 1.4.1. When I type: df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None)) (the df.where is a hack I found on the Spark JIRA to avoid a problem with NaN values making mixed column types) I get: TypeError: cannot create an RDD from type: type 'list' Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had this issue? This is already a workaround-- ideally I'd like to read the spark dataframe from a Hive table. But this is currently not an option for my setup. I also tried reading the data into spark from a CSV using spark-csv. Haven't been able to make this work as yet. I launch $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar and when I attempt to read the csv I get: Py4JJavaError: An error occurred while calling o22.load. : java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ... Other options I can think of: - Convert my CSV to json (use Pig?) and read into Spark - Read in using jdbc connect from postgres But want to make sure I'm not misusing Spark or missing something obvious. Thanks! Charlie
org.apache.hadoop.security.AccessControlException: Permission denied when access S3
Hi All, I try to access S3 file from S3 in Hadoop file format: Below is my code: Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.awsAccessKeyId, this.getAwsAccessKeyId()); hadoopConf.set(fs.s3n.awsSecretAccessKey, this.getAwsSecretAccessKey()); lines = ctx.newAPIHadoopFile(inputPath, NonSplitableTextInputFormat.class, LongWritable.class, Text.class, hadoopConf).values() .map(new FunctionText, String() { @Override public String call(Text arg0) throws Exception { return arg0.toString(); } }); And I have below error: Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: s3n:// at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J ets3tNativeFileSystemStore.java:449) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J ets3tNativeFileSystemStore.java:427) The permission should not have any problem (because I can use ctx.textFile without any issue). So the issue from the call: newAPIHadoopFile Anything else I need to setup for this? Regards, Shuai
Re: SparkSQL concerning materials
See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead as 3500 which seems to be good enough I believe. So you mean only GC could be the reason behind timeout I checked Yarn logs I did not see any GC error there. Please guide. Thanks much. On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Moving this back onto user@ Regarding GC, can you look in the web UI and see whether the GC time metric dominates the amount of time spent on each task (or at least the tasks that aren't completing)? Also, have you tried bumping your spark.yarn.executor.memoryOverhead? YARN may be killing your executors for using too much off-heap space. You can see whether this is happening by looking in the Spark AM or YARN NodeManager logs. -Sandy On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks much for the response. Yes I tried default settings too 0.2 it was also going into timeout if it is spending time in GC then why it is not throwing GC error I don't see any such error. Yarn logs are not helpful at all. What is tungsten how do I use it? Spark is doing great I believe my job runs successfully and 60% tasks completes only after first executor gets lost things are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What sounds most likely is that you're hitting heavy garbage collection. Did you hit issues when the shuffle memory fraction was at its default of 0.2? A potential danger with setting the shuffle storage to 0.7 is that it allows shuffle objects to get into the GC old generation, which triggers more stop-the-world garbage collections. Have you tried enabling Tungsten / unsafe? Unfortunately, Spark is still not that great at dealing with heavily-skewed shuffle data, because its reduce-side aggregation still operates on Java objects instead of binary data. -Sandy On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries executed using hiveContext.sql my data set is skewed so will be more shuffling I believe I don't know what's wrong spark job runs fine for almost an hour and when shuffle read shuffle write column in UI starts to show more than 10 gb executor starts to getting lost because of timeout and slowly other executor starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
dataframe json schema scan
The doc for DataFrameReader#json(RDD[String]) method says Unless the schema is specified using schema function, this function goes through the input once to determine the input schema. https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader Why is this necessary? Why can't it create the dataframe at the same time as it's determining the schema? Thanks.
Re: Data frame created from hive table and its partition
Thanks Michael. My bad regarding hive table primary keys. I have one big 140GB hdfs file and external hive table defined on it. Table is not partitioned. When I read external hive table using sqlContext.sql, how does spark decides number of partitions which should be created for that data frame? SparkUI tells me that 1000+ tasks are created to read the above mentioned table. I guess one task per hdfs block. Does that mean it creates 1000+ partition created for DF? Is there a way to (hash)partition data frame on specific key column[s] when I read/load the hive table in spark? Thanks, Vijay On Aug 20, 2015, at 3:05 PM, Michael Armbrust mich...@databricks.com wrote: There is no such thing as primary keys in the Hive metastore, but Spark SQL does support partitioned hive tables: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables DataFrameWriter also has a partitionBy method. On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io mailto:sparkh...@data2o.io wrote: Hi I have a question regarding data frame partition. I read a hive table from spark and following spark api converts it as DF. test_df = sqlContext.sql(“select * from hivetable1”) How does spark decide partition of test_df? Is there a way to partition test_df based on some column while reading hive table? Second question is, if that hive table has primary key declared, does spark honor PK in hive table and partition based on PKs? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
load NULL Values in RDD
Hi , Can anyone help me in loading a column that may or may not have NULL values in a RDD. Thanks
Creating Spark DataFrame from large pandas DataFrame
Hi, I'm new to spark and am trying to create a Spark df from a pandas df with ~5 million rows. Using Spark 1.4.1. When I type: df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None)) (the df.where is a hack I found on the Spark JIRA to avoid a problem with NaN values making mixed column types) I get: TypeError: cannot create an RDD from type: type 'list' Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had this issue? This is already a workaround-- ideally I'd like to read the spark dataframe from a Hive table. But this is currently not an option for my setup. I also tried reading the data into spark from a CSV using spark-csv. Haven't been able to make this work as yet. I launch $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar and when I attempt to read the csv I get: Py4JJavaError: An error occurred while calling o22.load. : java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ... Other options I can think of: - Convert my CSV to json (use Pig?) and read into Spark - Read in using jdbc connect from postgres But want to make sure I'm not misusing Spark or missing something obvious. Thanks! Charlie
Re: DataFrameWriter.jdbc is very slow
We will probably fix this in Spark 1.6 https://issues.apache.org/jira/browse/SPARK-10040 On Thu, Aug 20, 2015 at 5:18 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: We want to migrate our data (approximately 20M rows) from parquet to postgres, when we are using dataframe writer's jdbc method the execution time is very large, we have tried the same with batch insert it was much effective. Is it intentionally implemented in that way?
Re: Data frame created from hive table and its partition
There is no such thing as primary keys in the Hive metastore, but Spark SQL does support partitioned hive tables: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables DataFrameWriter also has a partitionBy method. On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi I have a question regarding data frame partition. I read a hive table from spark and following spark api converts it as DF. test_df = sqlContext.sql(“select * from hivetable1”) How does spark decide partition of test_df? Is there a way to partition test_df based on some column while reading hive table? Second question is, if that hive table has primary key declared, does spark honor PK in hive table and partition based on PKs? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FAILED_TO_UNCOMPRESS error from Snappy
Right after upgraded to 1.4.1, we started seeing this exception and yes we picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there anything I could try ? I don't have a repro case. org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) ... 33 more -- Kohki Nishio
Re: Saving and loading MLlib models as standalone (no Hadoop)
You can't serialize models out of Spark and then use them outside of the Spark context. However there is support for the PMML format - have a look at https://spark.apache.org/docs/latest/mllib-pmml-model-export.html Robin --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-and-loading-MLlib-models-as-standalone-no-Hadoop-tp24216p24371.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FAILED_TO_UNCOMPRESS error from Snappy
https://issues.apache.org/jira/browse/SPARK-7660 ? -- Ruslan Dautkhanov On Thu, Aug 20, 2015 at 1:49 PM, Kohki Nishio tarop...@gmail.com wrote: Right after upgraded to 1.4.1, we started seeing this exception and yes we picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there anything I could try ? I don't have a repro case. org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) ... 33 more -- Kohki Nishio
Re: SparkSQL concerning materials
Or if you're a python lover then this is a good place - https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html# On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote: See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: How to get the radius of clusters in spark K means
Okay. Thanks. I already did that and wanted to check whether is there any other method to extract it from the model itself. Thanks again for the help. On Thu, Aug 20, 2015 at 8:39 PM, Robin East robin.e...@xense.co.uk wrote: There is no cluster radius method on the model returned from K-means. You’ll need to roll it yourself by generating the distance from each point in the cluster to the cluster center itself and then take the max. --- Robin East *Spark GraphX in Action* Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ On 20 Aug 2015, at 07:14, ashensw as...@wso2.com wrote: We can get cluster centers in K means clustering. Like wise is there any method in spark to get the cluster radius? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-radius-of-clusters-in-spark-K-means-tp24353.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Ashen Weerathunga* Software Engineer - Intern WSO2 Inc.: http://wso2.com lean.enterprise.middleware Email: as...@wso2.com Mobile: +94 716042995 94716042995 LinkedIn: *http://lk.linkedin.com/in/ashenweerathunga http://lk.linkedin.com/in/ashenweerathunga*
How to list all dataframes and RDDs available in current session?
Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
MLlib Prefixspan implementation
I want to use prefixspan so I had a look at the code and the cited paper : Distributed PrefixSpan Algorithm Based on MapReduce. There is a result in the paper I didn't really undertstand and I could'nt find where it is used in the code. Suppose a sequence database S = {1,2...n}, a sequence a... is a length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a prefix of a length-(L-1) sequential pattern a...a, when the support count of a is not less than min_support, it is equal to obtaining a length-L sequential pattern a ... a from projected databases that obtaining a length-L sequential pattern a ... a from a sequence database S. According to the paper It's supposed to add a pruning step in the reduce function but I couldn't find where. This result seems to come from a previous paper : Wang Linlin, Fan Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to understand it and how it can improve the algorithm.
SparkR - can't create spark context - JVM not ready
Hello, I have previously successfully run SparkR in RStudio, with: Sys.setenv(SPARK_HOME=~/software/spark-1.4.1-bin-hadoop2.4) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local[2],appName=SparkR-example) Then I tried putting some of it into an .Rprofile. It seemed to work to load the paths and SparkR, but I got an error when trying to create the sc. I then removed my .Rprofile, as well as .rstudio-desktop. However, I still cannot create the sc. Here is the error sc - sparkR.init(master=local[2],appName=SparkR-example) Launching java with spark-submit command ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit sparkr-shell /var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file or directory Error in sparkR.init(master = local[2], appName = SparkR-example) : JVM is not ready after 10 seconds I suspected there was an incomplete process or something. I checked for any running R or Java processes and there were none. Has someone seen this type of error? I have the same error in both RStudio and in R shell (but not sparkR wrapper). Thanks, Deb
Spark SQL window functions (RowsBetween)
Hi All, I would like some clarification regarding window functions for Apache Spark 1.4.0 - https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html In particular, the rowsBetween * {{{ * val w = Window.partitionBy(name).orderBy(id) * df.select( * sum(price).over(w.rangeBetween(Long.MinValue, 2)), * avg(price).over(w.rowsBetween(0, 4)) * ) * }}} Are any of the window functions available without a hive context? If the answer is no, then is there any other way to accomplish this without using hive? I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are different then I need to increment the count of item_[i-1] by 1. col1| col2 -- 1| item_1 2| item_1 3| item_2 4| item_1 5| item_2 6| item_1 In the above example, if we scan two rows at a time downwards, we see that row 2 and row 3 are different therefore we add one to item_1. Next, we see that row 3 is different from row 4, then add one to item_2. Continue until we end up with: col2 | col3 --- item_1 | 2 item_2 | 2 Thanks, Mike.