Re: Time window on Processing Time
Hi, That's great. Thanks a lot. On Wed, Aug 30, 2017 at 10:44 AM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Yes, it can be! There is a sql function called current_timestamp() which > is self-explanatory. So I believe you should be able to do something like > > import org.apache.spark.sql.functions._ > > ds.withColumn("processingTime", current_timestamp()) > .groupBy(window("processingTime", "1 minute")) > .count() > > > On Mon, Aug 28, 2017 at 5:46 AM, madhu phatak <phatak@gmail.com> > wrote: > >> Hi, >> As I am playing with structured streaming, I observed that window >> function always requires a time column in input data.So that means it's >> event time. >> >> Is it possible to old spark streaming style window function based on >> processing time. I don't see any documentation on the same. >> >> -- >> Regards, >> Madhukara Phatak >> http://datamantra.io/ >> > > -- Regards, Madhukara Phatak http://datamantra.io/
Time window on Processing Time
Hi, As I am playing with structured streaming, I observed that window function always requires a time column in input data.So that means it's event time. Is it possible to old spark streaming style window function based on processing time. I don't see any documentation on the same. -- Regards, Madhukara Phatak http://datamantra.io/
Re: How to create SparkSession using SparkConf?
SparkSession.builder.config() takes SparkConf as parameter. You can use that to pass SparkConf as it is. https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/SparkSession.Builder.html#config(org.apache.spark.SparkConf) On Fri, Apr 28, 2017 at 11:40 AM, Yanbo Liangwrote: > StreamingContext is an old API, if you want to process streaming data, you > can use SparkSession directly. > FYI: http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html > > Thanks > Yanbo > > On Fri, Apr 28, 2017 at 12:12 AM, kant kodali wrote: > >> Actually one more question along the same line. This is about .getOrCreate() >> ? >> >> JavaStreamingContext doesn't seem to have a way to accept SparkSession >> object so does that mean a streaming context is not required? If so, how do >> I pass a lambda to .getOrCreate using SparkSession? The lambda that we >> normally pass when we call StreamingContext.getOrCreate. >> >> >> >> >> >> >> >> >> On Thu, Apr 27, 2017 at 8:47 AM, kant kodali wrote: >> >>> Ahhh Thanks much! I miss my sparkConf.setJars function instead of this >>> hacky comma separated jar names. >>> >>> On Thu, Apr 27, 2017 at 8:01 AM, Yanbo Liang wrote: >>> Could you try the following way? val spark = SparkSession.builder.appName("my-application").config("spark.jars", "a.jar, b.jar").getOrCreate() Thanks Yanbo On Thu, Apr 27, 2017 at 9:21 AM, kant kodali wrote: > I am using Spark 2.1 BTW. > > On Wed, Apr 26, 2017 at 3:22 PM, kant kodali > wrote: > >> Hi All, >> >> I am wondering how to create SparkSession using SparkConf object? >> Although I can see that most of the key value pairs we set in SparkConf >> we >> can also set in SparkSession or SparkSession.Builder however I don't see >> sparkConf.setJars which is required right? Because we want the driver jar >> to be distributed across the cluster whether we run it in client mode or >> cluster mode. so I am wondering how is this possible? >> >> Thanks! >> >> > >>> >> > -- Regards, Madhukara Phatak http://datamantra.io/
Re: Spark structured streaming is Micro batch?
Hi, Thank you for all those answers. The below is code I am trying out val records = sparkSession.read.format("csv").stream("/tmp/input") val re = records.write.format("parquet").trigger(ProcessingTime(100.seconds)). option("checkpointLocation", "/tmp/checkpoint") .startStream("/tmp/output") re.awaitTermination() In above code, I assume batch size is 100 seconds? But it doesn't seems to be that way. On Fri, May 6, 2016 at 3:14 PM, Sachin Aggarwal <different.sac...@gmail.com> wrote: > Hi Madhukara, > > What I understood from the code is that when ever runBatch return they > trigger constructBatch so whatever is processing time for a batch will be > ur batch time if u dnt specify a trigger. > > one flaw which i think in this is if your processing time keeps increasing > with amount of data , then this batch interval keeps on increasing, they > must put some boundary or some logic to block to prevent such case. > > here is one jira which i found related to this:- > https://github.com/apache/spark/pull/12725 > > > On Fri, May 6, 2016 at 2:50 PM, Deepak Sharma <deepakmc...@gmail.com> > wrote: > >> With Structured Streaming ,Spark would provide apis over spark sql engine. >> Its like once you have the structured stream and dataframe created out of >> this , you can do ad-hoc querying on the DF , which means you are actually >> querying the stram without having to store or transform. >> I have not used it yet but seems it will be like start streaming data >> from source as son as you define it. >> >> Thanks >> Deepak >> >> >> On Fri, May 6, 2016 at 1:37 PM, madhu phatak <phatak@gmail.com> >> wrote: >> >>> Hi, >>> As I was playing with new structured streaming API, I noticed that spark >>> starts processing as and when the data appears. It's no more seems like >>> micro batch processing. Is spark structured streaming will be an event >>> based processing? >>> >>> -- >>> Regards, >>> Madhukara Phatak >>> http://datamantra.io/ >>> >> >> >> >> -- >> Thanks >> Deepak >> www.bigdatabig.com >> www.keosha.net >> > > > > -- > > Thanks & Regards > > Sachin Aggarwal > 7760502772 > -- Regards, Madhukara Phatak http://datamantra.io/
Spark structured streaming is Micro batch?
Hi, As I was playing with new structured streaming API, I noticed that spark starts processing as and when the data appears. It's no more seems like micro batch processing. Is spark structured streaming will be an event based processing? -- Regards, Madhukara Phatak http://datamantra.io/
Talk on Deep dive in Spark Dataframe API
Hi, Recently I gave a talk on a deep dive into data frame api and sql catalyst . Video of the same is available on Youtube with slides and code. Please have a look if you are interested. *http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/ http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/* -- Regards, Madhukara Phatak http://datamantra.io/
Running mllib from R in Spark 1.4
Hi, I have been playing with Spark R API that is introduced in Spark 1.4 version. Can we use any mllib functionality from the R as of now?. From the documentation it looks like we can only use SQL/Dataframe functionality as of now. I know there is separate project SparkR project but it doesnot seems to be maintained in future. So if I want to run machine learning on SparkR, what are the options as of now? -- Regards, Madhukara Phatak http://datamantra.io/
Talk on Deep dive into Spark Data source API
Hi, Recently I gave a talk on how to create spark data sources from scratch. Screencast of the same is available on Youtube with slides and code. Please have a look if you are interested. http://blog.madhukaraphatak.com/anatomy-of-spark-datasource-api/ -- Regards, Madhukara Phatak http://datamantra.io/
Re: Re: how to distributed run a bash shell in spark
Hi, You can use pipe operator, if you are running shell script/perl script on some data. More information on my blog http://blog.madhukaraphatak.com/pipe-in-spark/. Regards, Madhukara Phatak http://datamantra.io/ On Mon, May 25, 2015 at 8:02 AM, luohui20...@sina.com wrote: Thanks Akhil, your code is a big help to me,'cause perl script is the exactly thing i wanna try to run in spark. I will have a try. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: how to distributed run a bash shell in spark 日期:2015年05月25日 00点53分 You mean you want to execute some shell commands from spark? Here's something i tried a while back. https://github.com/akhld/spark-exploit Thanks Best Regards On Sun, May 24, 2015 at 4:53 PM, luohui20...@sina.com wrote: hello there I am trying to run a app in which part of it needs to run a shell.how to run a shell distributed in spark cluster.thanks. here's my code: import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ShellCompare { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g); JavaSparkContext sc = new JavaSparkContext(conf); for(int i=1;i=21;i++){ execShell(i); } //execShell(1); sc.stop(); } private static void execShell(int i) { String shpath=/opt/sh/bin/sort.sh; Process process =null; String var=/opt/data/shellcompare/chr + i +.txt /opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 600; //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt /opt/sh/bin/result.txt 600; String command2 = sh + shpath + + var; try { process = Runtime.getRuntime().exec(command2); process.waitFor(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } Thanksamp;Best regards! San.Luo
Re: Spark SQL on large number of columns
Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Spark SQL on large number of columns
Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/
Re: Spark SQL on large number of columns
Hi, I have fields from field_0 to fied_26000. The query is select on max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) for all those 26000 fields in one query. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
Hi, An additional information is, table is backed by a csv file which is read using spark-csv from databricks. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:05 PM, madhu phatak phatak@gmail.com wrote: Hi, I have fields from field_0 to fied_26000. The query is select on max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) for all those 26000 fields in one query. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
Hi, Another update, when run on more that 1000 columns I am getting Could not write class __wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1 because it exceeds JVM code size limits. Method apply's code too large! Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 6:23 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested with HiveContext also. It also take similar amount of time. To make the things clear, the following is select clause for a given column *aggregateStats( $columnName , max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) )* aggregateStats is UDF generating case class to hold the values. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Re: Spark SQL on large number of columns
Hi, Tested with HiveContext also. It also take similar amount of time. To make the things clear, the following is select clause for a given column *aggregateStats( $columnName , max( cast($columnName as double)), |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) )* aggregateStats is UDF generating case class to hold the values. Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote: Hi, Tested for calculating values for 300 columns. Analyser takes around 4 minutes to generate the plan. Is this normal? Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote: Hi, I am using spark 1.3.1 Regards, Madhukara Phatak http://datamantra.io/ On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote: And which version are you using 发自我的 iPhone 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道: can you kindly share your code? On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying run spark sql aggregation on a file with 26k columns. No of rows is very small. I am running into issue that spark is taking huge amount of time to parse the sql and create a logical plan. Even if i have just one row, it's taking more than 1 hour just to get pass the parsing. Any idea how to optimize in these kind of scenarios? Regards, Madhukara Phatak http://datamantra.io/ -- Best Regards, Ayan Guha
Spark JDBC data source API issue with mysql
Hi, I have been trying out spark data source api with JDBC. The following is the code to get DataFrame, Try(hc.load(org.apache.spark.sql.jdbc,Map(url - dbUrl,dbtable-s($ query) ))) By looking at test cases, I found that query has to be inside brackets, otherwise it's treated as table name. But with when used with MySQL, query inside the ( ) is treated as derived table which is throwing exception. Is this the right way to pass the queries to jdbc source or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: Hive table creation - possible bug in Spark 1.3?
Hi, Hive table creation need an extra step from 1.3. You can follow the following template df.registerTempTable(tableName) hc.sql(screate table $tableName as select * from $tableName) this will save the table in hive with given tableName. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com wrote: Sorry for the confusion. We should be more clear about the semantics in the documentation. (PRs welcome :) ) .saveAsTable does not create a hive table, but instead creates a Spark Data Source table. Here the metadata is persisted into Hive, but hive cannot read the tables (as this API support MLlib vectors, schema discovery, and other things that hive does not). If you want to create a hive table, use HiveQL and run a CREATE TABLE AS SELECT ... On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote: I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms} http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
Re: Hive table creation - possible bug in Spark 1.3?
Hi Michael, Here https://issues.apache.org/jira/browse/SPARK-7084 is the jira issue and PR https://github.com/apache/spark/pull/5654 for the same. Please have a look. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 1:22 PM, madhu phatak phatak@gmail.com wrote: Hi, Hive table creation need an extra step from 1.3. You can follow the following template df.registerTempTable(tableName) hc.sql(screate table $tableName as select * from $tableName) this will save the table in hive with given tableName. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com wrote: Sorry for the confusion. We should be more clear about the semantics in the documentation. (PRs welcome :) ) .saveAsTable does not create a hive table, but instead creates a Spark Data Source table. Here the metadata is persisted into Hive, but hive cannot read the tables (as this API support MLlib vectors, schema discovery, and other things that hive does not). If you want to create a hive table, use HiveQL and run a CREATE TABLE AS SELECT ... On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote: I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms} http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Hi, AFAIK it's only build with 2.10 and 2.11. You should integrate kafka_2.10.0-0.8.0 to make it work. Regards, Madhukara Phatak http://datamantra.io/ On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
Anatomy of RDD : Deep dive into RDD data structure
Hi, Recently I gave a talk on RDD data structure which gives in depth understanding of spark internals. You can watch it on youtube https://www.youtube.com/watch?v=WVdyuVwWcBc. Also slides are on slideshare http://www.slideshare.net/datamantra/anatomy-of-rdd and code is on github https://github.com/phatak-dev/anatomy-of-rdd. Regards, Madhukara Phatak http://datamantra.io/
Re: MappedStream vs Transform API
Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: why generateJob is a private API?
Hi, Thank you for the response. Regards, Madhukara Phatak http://datamantra.io/ On Tue, Mar 17, 2015 at 5:50 AM, Tathagata Das t...@databricks.com wrote: It was not really meant to be pubic and overridden. Because anything you want to do to generate jobs from RDDs can be done using DStream.foreachRDD On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: MappedStream vs Transform API
Hi, Regards, Madhukara Phatak http://datamantra.io/ On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote: That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. Ok. When I was going through source code it confused me to understand what were right extension points were. So I thought whoever go through the code may get into same situation. But if it's not super essential then ok. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. Yes please. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: MappedStream vs Transform API
Hi, Sorry for the wrong formatting in the earlier mail. On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote: That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. Ok. When I was going through source code it confused me to understand what were right extension points were. So I thought whoever go through the code may get into same situation. But if it's not super essential then ok. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. That will be great. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/ Regards, Madhukara Phatak http://datamantra.io/
Re: MappedStream vs Transform API
Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
why generateJob is a private API?
Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: Need Advice about reading lots of text files
Hi, Internally Spark uses HDFS api to handle file data. Have a look at HAR, Sequence file input format. More information on this cloudera blog http://blog.cloudera.com/blog/2009/02/the-small-files-problem/. Regards, Madhukara Phatak http://datamantra.io/ On Sun, Mar 15, 2015 at 9:59 PM, Pat Ferrel p...@occamsmachete.com wrote: Ah most interesting—thanks. So it seems sc.textFile(longFileList) has to read all metadata before starting the read for partitioning purposes so what you do is not use it? You create a task per file that reads one file (in parallel) per task without scanning for _all_ metadata. Can’t argue with the logic but perhaps Spark should incorporate something like this in sc.textFile? My case can’t be that unusual especially since I am periodically processing micro-batches from Spark Streaming. In fact Actually I have to scan HDFS to create the longFileList to begin with so get file status and therefore probably all the metadata needed by sc.textFile. Your method would save one scan, which is good. Might a better sc.textFile take a beginning URI, a file pattern regex, and a recursive flag? Then one scan could create all metadata automatically for a large subset of people using the function, something like sc.textFile(beginDir: String, filePattern: String = “^part.*”, recursive: Boolean = false) I fact it should be easy to create BetterSC that overrides the textFile method with a re-implementation that only requires one scan to get metadata. Just thinking on email… On Mar 14, 2015, at 11:11 AM, Michael Armbrust mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MappedStream vs Transform API
Hi, Current implementation of map function in spark streaming looks as below. def map[U: ClassTag](mapFunc: T = U): DStream[U] = { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API def map[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: Streaming: getting data from Cassandra based on input stream values
Hi, In that case, you can try the following. val joinRDD = kafkaStream.transform( streamRDD = { val ids = streamRDD.map(_._2).collect(); ids.map(userId = ctable.select(user_name).where(userid = ?, userId).toArray(0).get[String](0)) // better create a query which checks for all those ids at same time }) On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko s...@dicefield.com wrote: Hi Madhu, Thanks for you response! But as I understand in this case you select all data from the Cassandra table. I don't wanna do it as it can be huge. I wanna just lookup some ids in the table. So it doesn't make sense for me how I can put some values from the streamRDD to the cassandra query (to where method). Greg On 1/23/15 1:11 AM, madhu phatak wrote: Hi, Seems like you want to get username for a give user id. You can use transform on the kafka stream to join two RDD's. The psuedo code looks like this val joinRDD = kafkaStream.transform( streamRDD = { streamRDD.map(value = (value._2,value._1)) join with (ctable.select(userid,username)) }) On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko s...@dicefield.com wrote: Hi there, I think I have a basic question, but I'm sort of stuck with figuring out how to approach it, and I thought someone could point me to the right direction. I'd like pull some data from Cassandra based on values received from an input stream. Something like val ctable = ssc.cassandraTable(keyspace, users) val userNames = kafkaStream.flatMap { case (key,userid) = { val userName = ctable.select(user_name).where(userid = ?, userId).toArray(0).get[String](0) Some(userId, userName) } } While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 35, localhost): java.lang.NullPointerException: org.apache.spark.rdd.RDD.init(RDD.scala:125) com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:49) com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143) My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD. But how should I approach the problem instead? Thanks, -- Greg -- Regards, Madhukara Phatak http://www.madhukaraphatak.com -- Greg -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: save a histogram to a file
Hi, histogram method return normal scala types not a RDD. So you will not have saveAsTextFile. You can use makeRDD method make a rdd out of the data and saveAsObject file val hist = a.histogram(10) val histRDD = sc.makeRDD(hist) histRDD.saveAsObjectFile(path) On Fri, Jan 23, 2015 at 5:37 AM, SK skrishna...@gmail.com wrote: Hi, histogram() returns an object that is a pair of Arrays. There appears to be no saveAsTextFile() for this paired object. Currently I am using the following to save the output to a file: val hist = a.histogram(10) val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1) val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2) Is there a simpler way to save the histogram() result to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: DAG info
Hi, You can turn off these messages using log4j.properties. On Fri, Jan 2, 2015 at 1:51 PM, Robineast robin.e...@xense.co.uk wrote: Do you have some example code of what you are trying to do? Robin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940p20941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: broadcasting object issue
Hi, Just ran your code on spark-shell. If you replace val bcA = sc.broadcast(a) with val bcA = sc.broadcast(new B().getA) it seems to work. Not sure why. On Tue, Dec 23, 2014 at 9:12 AM, Henry Hung ythu...@winbond.com wrote: Hi All, I have a problem with broadcasting a serialize class object that returned by another not-serialize class, here is the sample code: class A extends java.io.Serializable { def halo(): String = halo } class B { def getA() = new A } val list = List(1) val b = new B val a = b.getA val p = sc.parallelize(list) // this will fail val bcA = sc.broadcast(a) p.map(x = { bcA.value.halo() }) // this will success val bcA = sc.broadcast(new A) p.map(x = { bcA.value.halo() }) A is a serializable class, where B is not-serialize. If I create a new object A through B method getA(), the map process will failed with exception “org.apache.spark.SparkException: Task not serializable, Caused by: java.io.NotSerializableException: $iwC$$iwC$B” I don’t know why spark will check if the B class serializable or not, is there a way to code this? Best regards, Henry -- The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: Joins in Spark
Hi, You can map your vertices rdd as follow val pairVertices = verticesRDD.map(vertice = (vertice,null)) the above gives you a pairRDD. After join make sure that you remove superfluous null value. On Tue, Dec 23, 2014 at 10:36 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have two RDDs, vertices and edges. Vertices is an RDD and edges is a pair RDD. I want to take three way join of these two. Joins work only when both the RDDs are pair RDDS right? So, how am I supposed to take a three way join of these RDDs? Thank You -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: reading files recursively using spark
Hi, You can use FileInputformat API of Hadoop and newApiHadoopFile of spark to get recursion. More on the topic you can refer here http://stackoverflow.com/questions/8114579/using-fileinputformat-addinputpaths-to-recursively-add-hdfs-path On Fri, Dec 19, 2014 at 4:50 PM, Sean Owen so...@cloudera.com wrote: How about using the HDFS API to create a list of all the directories to read from, and passing them as a comma-joined string to sc.textFile? On Fri, Dec 19, 2014 at 11:13 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! what is efficient way to read all files using spark from directory and its sub-directories as well.currently i move all files from directory and it sub-directories into another temporary directory and then read them all using sc.textFile method. But I want a method so that moving to temporary directory cost may be saved. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reading-files-recursively-using-spark-tp20782.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: SchemaRDD.sample problem
Hi, Can you clean up the code lil bit better, it's hard to read what's going on. You can use pastebin or gist to put the code. On Wed, Dec 17, 2014 at 3:58 PM, Hao Ren inv...@gmail.com wrote: Hi, I am using SparkSQL on 1.2.1 branch. The problem comes froms the following 4-line code: *val t1: SchemaRDD = hiveContext hql select * from product where is_new = 0 val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05) tb1.registerTempTable(t1_tmp) (hiveContext sql select count(*) from t1_tmp where is_new = 1) collect foreach println* We know that *t1* contains only rows whose is_new field is zero. After sampling t1 by taking 5% rows, normally, the sampled table should always contains only rows where is_new = 0. However, line 4 gives a number about 5 by chance. That means there are some rows where is_new = 1 in the sampled table, which is not logically possible. I am not sure SchemaRDD.sample is doing his work well. Any idea ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: When will spark 1.2 released?
It’s on Maven Central already http://search.maven.org/#browse%7C717101892 On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com vboylin1...@gmail.com wrote: Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师 -- Regards, Madhukara Phatak http://www.madhukaraphatak.com