Re: Time window on Processing Time

2017-08-30 Thread madhu phatak
Hi,
That's great. Thanks a lot.

On Wed, Aug 30, 2017 at 10:44 AM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> Yes, it can be! There is a sql function called current_timestamp() which
> is self-explanatory. So I believe you should be able to do something like
>
> import org.apache.spark.sql.functions._
>
> ds.withColumn("processingTime", current_timestamp())
>   .groupBy(window("processingTime", "1 minute"))
>   .count()
>
>
> On Mon, Aug 28, 2017 at 5:46 AM, madhu phatak <phatak@gmail.com>
> wrote:
>
>> Hi,
>> As I am playing with structured streaming, I observed that window
>> function always requires a time column in input data.So that means it's
>> event time.
>>
>> Is it possible to old spark streaming style window function based on
>> processing time. I don't see any documentation on the same.
>>
>> --
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>
>


-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Time window on Processing Time

2017-08-28 Thread madhu phatak
Hi,
As I am playing with structured streaming, I observed that window function
always requires a time column in input data.So that means it's event time.

Is it possible to old spark streaming style window function based on
processing time. I don't see any documentation on the same.

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: How to create SparkSession using SparkConf?

2017-04-28 Thread madhu phatak
SparkSession.builder.config() takes SparkConf as parameter. You can use
that to pass SparkConf as it is.

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/SparkSession.Builder.html#config(org.apache.spark.SparkConf)

On Fri, Apr 28, 2017 at 11:40 AM, Yanbo Liang  wrote:

> StreamingContext is an old API, if you want to process streaming data, you
> can use SparkSession directly.
> FYI: http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html
>
> Thanks
> Yanbo
>
> On Fri, Apr 28, 2017 at 12:12 AM, kant kodali  wrote:
>
>> Actually one more question along the same line. This is about .getOrCreate()
>> ?
>>
>> JavaStreamingContext doesn't seem to have a way to accept SparkSession
>> object so does that mean a streaming context is not required? If so, how do
>> I pass a lambda to .getOrCreate using SparkSession? The lambda that we
>> normally pass when we call StreamingContext.getOrCreate.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Apr 27, 2017 at 8:47 AM, kant kodali  wrote:
>>
>>> Ahhh Thanks much! I miss my sparkConf.setJars function instead of this
>>> hacky comma separated jar names.
>>>
>>> On Thu, Apr 27, 2017 at 8:01 AM, Yanbo Liang  wrote:
>>>
 Could you try the following way?

 val spark = 
 SparkSession.builder.appName("my-application").config("spark.jars", 
 "a.jar, b.jar").getOrCreate()


 Thanks

 Yanbo


 On Thu, Apr 27, 2017 at 9:21 AM, kant kodali 
 wrote:

> I am using Spark 2.1 BTW.
>
> On Wed, Apr 26, 2017 at 3:22 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering how to create SparkSession using SparkConf object?
>> Although I can see that most of the key value pairs we set in SparkConf 
>> we
>> can also set in SparkSession or  SparkSession.Builder however I don't see
>> sparkConf.setJars which is required right? Because we want the driver jar
>> to be distributed across the cluster whether we run it in client mode or
>> cluster mode. so I am wondering how is this possible?
>>
>> Thanks!
>>
>>
>

>>>
>>
>


-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Spark structured streaming is Micro batch?

2016-05-07 Thread madhu phatak
Hi,
Thank you for all those answers.

The below is code I am trying out

val records = sparkSession.read.format("csv").stream("/tmp/input")

val re = records.write.format("parquet").trigger(ProcessingTime(100.seconds)).
  option("checkpointLocation", "/tmp/checkpoint")
  .startStream("/tmp/output")


re.awaitTermination()


In above code, I assume batch size is 100 seconds? But it doesn't
seems to be that way.


On Fri, May 6, 2016 at 3:14 PM, Sachin Aggarwal <different.sac...@gmail.com>
wrote:

> Hi Madhukara,
>
> What I understood from the code is that when ever runBatch return they
> trigger constructBatch so whatever is processing time for a batch will be
> ur batch time if u dnt specify a trigger.
>
> one flaw which i think in this is if your processing time keeps increasing
> with amount of data , then this batch interval keeps on increasing, they
> must put some boundary or some logic to block to prevent such case.
>
> here is one jira which i found related to this:-
> https://github.com/apache/spark/pull/12725
>
>
> On Fri, May 6, 2016 at 2:50 PM, Deepak Sharma <deepakmc...@gmail.com>
> wrote:
>
>> With Structured Streaming ,Spark would provide apis over spark sql engine.
>> Its like once you have the structured stream and dataframe created out of
>> this , you can do ad-hoc querying on the DF , which means you are actually
>> querying the stram without having to store or transform.
>> I have not used it yet but seems it will be like start streaming data
>> from source  as son as you define it.
>>
>> Thanks
>> Deepak
>>
>>
>> On Fri, May 6, 2016 at 1:37 PM, madhu phatak <phatak@gmail.com>
>> wrote:
>>
>>> Hi,
>>> As I was playing with new structured streaming API, I noticed that spark
>>> starts processing as and when the data appears. It's no more seems like
>>> micro batch processing. Is spark structured streaming will be an event
>>> based processing?
>>>
>>> --
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>



-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Spark structured streaming is Micro batch?

2016-05-06 Thread madhu phatak
Hi,
As I was playing with new structured streaming API, I noticed that spark
starts processing as and when the data appears. It's no more seems like
micro batch processing. Is spark structured streaming will be an event
based processing?

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Talk on Deep dive in Spark Dataframe API

2015-08-06 Thread madhu phatak
Hi,
Recently I gave a talk on a deep dive into data frame api and sql catalyst
. Video of the same is available on Youtube with slides and code. Please
have a look if you are interested.

*http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/
http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/*

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Running mllib from R in Spark 1.4

2015-07-15 Thread madhu phatak
Hi,
I have been playing with Spark R API that is introduced in Spark 1.4
version. Can we use any mllib functionality from the R as of now?. From the
documentation it looks like we can only use SQL/Dataframe functionality as
of now. I know there is separate project SparkR project but it doesnot
seems to be maintained in future.

So if I want to run machine learning on SparkR, what are the options as of
now?

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Talk on Deep dive into Spark Data source API

2015-06-30 Thread madhu phatak
Hi,
Recently I gave a talk on how to create spark data sources from scratch.
Screencast of the same is available on Youtube with slides and code. Please
have a look if you are interested.

http://blog.madhukaraphatak.com/anatomy-of-spark-datasource-api/

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Re: how to distributed run a bash shell in spark

2015-05-25 Thread madhu phatak
Hi,
You can use pipe operator, if you are running shell script/perl script on
some data. More information on my blog
http://blog.madhukaraphatak.com/pipe-in-spark/.




Regards,
Madhukara Phatak
http://datamantra.io/

On Mon, May 25, 2015 at 8:02 AM, luohui20...@sina.com wrote:

 Thanks Akhil,

your code is a big help to me,'cause perl script is the exactly
 thing i wanna try to run in spark. I will have a try.


 

 Thanksamp;Best regards!
 San.Luo

 - 原始邮件 -
 发件人:Akhil Das ak...@sigmoidanalytics.com
 收件人:罗辉 luohui20...@sina.com
 抄送人:user user@spark.apache.org
 主题:Re: how to distributed run a bash shell in spark
 日期:2015年05月25日 00点53分

 You mean you want to execute some shell commands from spark? Here's
 something i tried a while back. https://github.com/akhld/spark-exploit

 Thanks
 Best Regards

 On Sun, May 24, 2015 at 4:53 PM, luohui20...@sina.com wrote:

 hello there

   I am trying to run a app in which part of it needs to run a
 shell.how to run a shell distributed in spark cluster.thanks.


 here's my code:

 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;

 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;

 public class ShellCompare {
 public static void main(String[] args) {
 // TODO Auto-generated method stub
 SparkConf conf = new
 SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g);
 JavaSparkContext sc = new JavaSparkContext(conf);

 for(int i=1;i=21;i++){
 execShell(i);
 }
 //execShell(1);
 sc.stop();
 }

 private static void execShell(int i) {
 String shpath=/opt/sh/bin/sort.sh;
 Process process =null;

 String var=/opt/data/shellcompare/chr + i +.txt
 /opt/data/shellcompare/samplechr + i +.txt
 /opt/data/shellcompare/result.txt 600;
 //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt
 /opt/sh/bin/result.txt 600;
 String command2 = sh  + shpath +   + var;
 try {
 process = Runtime.getRuntime().exec(command2);
 process.waitFor();
 } catch (InterruptedException e1) {
 // TODO Auto-generated catch block
 e1.printStackTrace();
 } catch (IOException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }

 }



 

 Thanksamp;Best regards!
 San.Luo





Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I am using spark 1.3.1




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha




Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I  am trying run spark sql aggregation on a file with 26k columns. No of
rows is very small. I am running into issue that spark is taking huge
amount of time to parse the sql and create a logical plan. Even if i have
just one row, it's taking more than 1 hour just to get pass the parsing.
Any idea how to optimize in these kind of scenarios?


Regards,
Madhukara Phatak
http://datamantra.io/


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I have fields from field_0 to fied_26000. The query is select on

max( cast($columnName as double)),
   |min(cast($columnName as double)), avg(cast($columnName as double)), count(*)

for all those 26000 fields in one query.





Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote:

 can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


 Regards,
 Madhukara Phatak
 http://datamantra.io/




 --
 Best Regards,
 Ayan Guha



Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
An additional information is,  table is backed by a csv file which is read
using spark-csv from databricks.




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:05 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I have fields from field_0 to fied_26000. The query is select on

 max( cast($columnName as double)),
|min(cast($columnName as double)), avg(cast($columnName as double)), 
 count(*)

 for all those 26000 fields in one query.





 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote:

 can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


 Regards,
 Madhukara Phatak
 http://datamantra.io/




 --
 Best Regards,
 Ayan Guha





Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested for calculating values for 300 columns. Analyser takes around 4
minutes to generate the plan. Is this normal?




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha





Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Another update, when run on more that 1000 columns I am getting

Could not write class
__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1
because it exceeds JVM code size limits. Method apply's code too large!






Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 6:23 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 Tested with HiveContext also. It also take similar amount of time.

 To make the things clear, the following is select clause for a given column


 *aggregateStats( $columnName , max( cast($columnName as double)),   
 |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) 
 )*

 aggregateStats is UDF generating case class to hold the values.








 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 Tested for calculating values for 300 columns. Analyser takes around 4
 minutes to generate the plan. Is this normal?




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com
 wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No
 of rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha







Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested with HiveContext also. It also take similar amount of time.

To make the things clear, the following is select clause for a given column


*aggregateStats( $columnName , max( cast($columnName as double)),
|min(cast($columnName as double)), avg(cast($columnName as double)),
count(*) )*

aggregateStats is UDF generating case class to hold the values.








Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 Tested for calculating values for 300 columns. Analyser takes around 4
 minutes to generate the plan. Is this normal?




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No
 of rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha






Spark JDBC data source API issue with mysql

2015-04-27 Thread madhu phatak
Hi,
 I have been trying out spark data source api with JDBC. The following is
the code to get DataFrame,

 Try(hc.load(org.apache.spark.sql.jdbc,Map(url - dbUrl,dbtable-s($
query) )))


By looking at test cases, I found that query has to be inside brackets,
otherwise it's treated as table name.  But with when used with MySQL, query
inside the ( ) is treated as derived table which is throwing exception. Is
this the right way to pass the queries to jdbc source or am I missing
something?


Regards,
Madhukara Phatak
http://datamantra.io/


Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi,
 Hive table creation need an extra step from 1.3. You can follow the
following template

 df.registerTempTable(tableName)

 hc.sql(screate table $tableName as select * from $tableName)

this will save the table in hive with given tableName.









Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com
wrote:

 Sorry for the confusion.  We should be more clear about the semantics in
 the documentation. (PRs welcome :) )

 .saveAsTable does not create a hive table, but instead creates a Spark
 Data Source table.  Here the metadata is persisted into Hive, but hive
 cannot read the tables (as this API support MLlib vectors, schema
 discovery, and other things that hive does not).  If you want to create a
 hive table, use HiveQL and run a CREATE TABLE AS SELECT ...

 On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote:

 I wrote few mails here regarding this issue.
 After further investigation I think there is a bug in Spark 1.3 in saving
 Hive tables.

 (hc is HiveContext)

 1. Verify the needed configuration exists:
 scala hc.sql(set hive.exec.compress.output).collect
 res4: Array[org.apache.spark.sql.Row] =
 Array([hive.exec.compress.output=true])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.codec).collect
 res5: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.type).collect
 res6: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
 2. Loading DataFrame and save as table (path point to exists file):
 val saDF = hc.parquetFile(path)
 saDF.count

 (count yield 229764 - i.e. the rdd exists)
 saDF.saveAsTable(test_hive_ms)

 Now for few interesting outputs:
 1. Trying to query Hive CLI, the table exists but with wrong output
 format:
 Failed with exception java.io.IOException:java.io.IOException: hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
 not a SequenceFile
 2. Looking at the output files found that files are '.parquet' and not
 '.snappy'
 3. Looking at the saveAsTable output shows that it actually store the
 table in both, wrong output format and without compression:
 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
 createTime:1429687014, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
 http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
 EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
 spark.sql.sources.provider=org.apache.spark.sql.parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

 So, the question is: do I miss some configuration here or should I open a
 bug?

 Thanks,
 Ophir





Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi Michael,
Here https://issues.apache.org/jira/browse/SPARK-7084 is the jira issue
and PR https://github.com/apache/spark/pull/5654 for the same. Please
have a look.




Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 1:22 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
  Hive table creation need an extra step from 1.3. You can follow the
 following template

  df.registerTempTable(tableName)

  hc.sql(screate table $tableName as select * from $tableName)

 this will save the table in hive with given tableName.









 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com
 wrote:

 Sorry for the confusion.  We should be more clear about the semantics in
 the documentation. (PRs welcome :) )

 .saveAsTable does not create a hive table, but instead creates a Spark
 Data Source table.  Here the metadata is persisted into Hive, but hive
 cannot read the tables (as this API support MLlib vectors, schema
 discovery, and other things that hive does not).  If you want to create a
 hive table, use HiveQL and run a CREATE TABLE AS SELECT ...

 On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote:

 I wrote few mails here regarding this issue.
 After further investigation I think there is a bug in Spark 1.3 in
 saving Hive tables.

 (hc is HiveContext)

 1. Verify the needed configuration exists:
 scala hc.sql(set hive.exec.compress.output).collect
 res4: Array[org.apache.spark.sql.Row] =
 Array([hive.exec.compress.output=true])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.codec).collect
 res5: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.type).collect
 res6: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
 2. Loading DataFrame and save as table (path point to exists file):
 val saDF = hc.parquetFile(path)
 saDF.count

 (count yield 229764 - i.e. the rdd exists)
 saDF.saveAsTable(test_hive_ms)

 Now for few interesting outputs:
 1. Trying to query Hive CLI, the table exists but with wrong output
 format:
 Failed with exception java.io.IOException:java.io.IOException: hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
 not a SequenceFile
 2. Looking at the output files found that files are '.parquet' and not
 '.snappy'
 3. Looking at the saveAsTable output shows that it actually store the
 table in both, wrong output format and without compression:
 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
 createTime:1429687014, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
 http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
 EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
 spark.sql.sources.provider=org.apache.spark.sql.parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

 So, the question is: do I miss some configuration here or should I open
 a bug?

 Thanks,
 Ophir






Re: Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread madhu phatak
Hi,
AFAIK it's only build with 2.10 and 2.11.  You should integrate
kafka_2.10.0-0.8.0
to make it work.




Regards,
Madhukara Phatak
http://datamantra.io/

On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can
 integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 .

 Thanks.



Anatomy of RDD : Deep dive into RDD data structure

2015-03-31 Thread madhu phatak
Hi,
 Recently I gave a talk on RDD data structure which gives in depth
understanding of spark internals. You can watch it on youtube
https://www.youtube.com/watch?v=WVdyuVwWcBc. Also slides are on slideshare
http://www.slideshare.net/datamantra/anatomy-of-rdd and code is on github
https://github.com/phatak-dev/anatomy-of-rdd.



Regards,
Madhukara Phatak
http://datamantra.io/


Re: MappedStream vs Transform API

2015-03-17 Thread madhu phatak
Hi,
 Thank you for the  response.

 Can I give a PR to use transform for all the functions like map,flatMap
etc so they are consistent with other API's?.

Regards,
Madhukara Phatak
http://datamantra.io/

On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote:

 It's mostly for legacy reasons. First we had added all the MappedDStream,
 etc. and then later we realized we need to expose something that is more
 generic for arbitrary RDD-RDD transformations. It can be easily replaced.
 However, there is a slight value in having MappedDStream, for developers to
 learn about DStreams.

 TD

 On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thanks for the response. I understand that part. But I am asking why the
 internal implementation using a subclass when it can use an existing api?
 Unless there is a real difference, it feels like code smell to me.


 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as
 below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any advantage
 having a subclass of DStream?. Why can't we just use transform API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/






Re: why generateJob is a private API?

2015-03-17 Thread madhu phatak
Hi,
 Thank you for the response.

Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, Mar 17, 2015 at 5:50 AM, Tathagata Das t...@databricks.com wrote:

 It was not really meant to be pubic and overridden. Because anything you
 want to do to generate jobs from RDDs can be done using DStream.foreachRDD

 On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  I am trying to create a simple subclass of DStream.  If I understand
 correctly, I should override *compute *lazy operations and *generateJob*
 for actions. But when I try to override, generateJob it gives error saying
 method is private to the streaming package. Is my approach is correct or am
 I  missing something?


 Regards,
 Madhukara Phatak
 http://datamantra.io/





Re: MappedStream vs Transform API

2015-03-17 Thread madhu phatak
Hi,
Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote:

 That's not super essential, and hence hasn't been done till now. Even in
 core Spark there are MappedRDD, etc. even though all of them can be
 implemented by MapPartitionedRDD (may be the name is wrong). So its nice to
 maintain the consistency, MappedDStream creates MappedRDDs. :)
 Though this does not eliminate the possibility that we will do it. Maybe
 in future, if we find that maintaining these different DStreams is becoming
 a maintenance burden (its isn't yet), we may collapse them to use
 transform. We did so in the python API for exactly this reason.


  Ok. When I was going through source code it confused me to understand
what were right extension points were. So I thought whoever go   through
the code may get into same situation.  But if it's not super essential then
ok.


 If you are interested in contributing to Spark Streaming, i can point you
 to a number of issues where your contributions will be more valuable.


   Yes please.


 TD

 On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thank you for the  response.

  Can I give a PR to use transform for all the functions like map,flatMap
 etc so they are consistent with other API's?.

 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com
 wrote:

 It's mostly for legacy reasons. First we had added all the
 MappedDStream, etc. and then later we realized we need to expose something
 that is more generic for arbitrary RDD-RDD transformations. It can be
 easily replaced. However, there is a slight value in having MappedDStream,
 for developers to learn about DStreams.

 TD

 On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thanks for the response. I understand that part. But I am asking why
 the internal implementation using a subclass when it can use an existing
 api? Unless there is a real difference, it feels like code smell to me.


 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as
 below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of
 DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any
 advantage having a subclass of DStream?. Why can't we just use transform
 API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/








Re: MappedStream vs Transform API

2015-03-17 Thread madhu phatak
Hi,
 Sorry for the wrong formatting in the earlier mail.

On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote:

 That's not super essential, and hence hasn't been done till now. Even in
 core Spark there are MappedRDD, etc. even though all of them can be
 implemented by MapPartitionedRDD (may be the name is wrong). So its nice to
 maintain the consistency, MappedDStream creates MappedRDDs. :)
 Though this does not eliminate the possibility that we will do it. Maybe
 in future, if we find that maintaining these different DStreams is becoming
 a maintenance burden (its isn't yet), we may collapse them to use
 transform. We did so in the python API for exactly this reason.


  Ok. When I was going through source code it confused me to understand
what were right extension points were. So I thought whoever go   through
the code may get into same situation.  But if it's not super essential then
ok.



 If you are interested in contributing to Spark Streaming, i can point you
 to a number of issues where your contributions will be more valuable.


   That will be great.



 TD

 On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thank you for the  response.

  Can I give a PR to use transform for all the functions like map,flatMap
 etc so they are consistent with other API's?.

 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com
 wrote:

 It's mostly for legacy reasons. First we had added all the
 MappedDStream, etc. and then later we realized we need to expose something
 that is more generic for arbitrary RDD-RDD transformations. It can be
 easily replaced. However, there is a slight value in having MappedDStream,
 for developers to learn about DStreams.

 TD

 On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
  Thanks for the response. I understand that part. But I am asking why
 the internal implementation using a subclass when it can use an existing
 api? Unless there is a real difference, it feels like code smell to me.


 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as
 below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of
 DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any
 advantage having a subclass of DStream?. Why can't we just use transform
 API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/







Regards,
Madhukara Phatak
http://datamantra.io/


Re: MappedStream vs Transform API

2015-03-16 Thread madhu phatak
Hi,
 Thanks for the response. I understand that part. But I am asking why the
internal implementation using a subclass when it can use an existing api?
Unless there is a real difference, it feels like code smell to me.


Regards,
Madhukara Phatak
http://datamantra.io/

On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think these two ways are both OK for you to write streaming job,
 `transform` is a more general way for you to transform from one DStream to
 another if there’s no related DStream API (but have related RDD API). But
 using map maybe more straightforward and easy to understand.



 Thanks

 Jerry



 *From:* madhu phatak [mailto:phatak@gmail.com]
 *Sent:* Monday, March 16, 2015 4:32 PM
 *To:* user@spark.apache.org
 *Subject:* MappedStream vs Transform API



 Hi,

   Current implementation of map function in spark streaming looks as below.



   *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = {

   *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc))
 }

  It creates an instance of MappedDStream which is a subclass of DStream.



 The same function can be also implemented using transform API



 *def map*[U: ClassTag](mapFunc: T = U): DStream[U] =

 this.transform(rdd = {

   rdd.map(mapFunc)
 })



 Both implementation looks same. If they are same, is there any advantage
 having a subclass of DStream?. Why can't we just use transform API?





 Regards,
 Madhukara Phatak
 http://datamantra.io/



why generateJob is a private API?

2015-03-16 Thread madhu phatak
Hi,
 I am trying to create a simple subclass of DStream.  If I understand
correctly, I should override *compute *lazy operations and *generateJob*
for actions. But when I try to override, generateJob it gives error saying
method is private to the streaming package. Is my approach is correct or am
I  missing something?


Regards,
Madhukara Phatak
http://datamantra.io/


Re: Need Advice about reading lots of text files

2015-03-16 Thread madhu phatak
Hi,
Internally Spark uses HDFS api to handle file data. Have a look at HAR,
Sequence file input format. More information on this cloudera blog
http://blog.cloudera.com/blog/2009/02/the-small-files-problem/.

Regards,
Madhukara Phatak
http://datamantra.io/

On Sun, Mar 15, 2015 at 9:59 PM, Pat Ferrel p...@occamsmachete.com wrote:

 Ah most interesting—thanks.

 So it seems sc.textFile(longFileList) has to read all metadata before
 starting the read for partitioning purposes so what you do is not use it?

 You create a task per file that reads one file (in parallel) per task
 without scanning for _all_ metadata. Can’t argue with the logic but perhaps
 Spark should incorporate something like this in sc.textFile? My case can’t
 be that unusual especially since I am periodically processing micro-batches
 from Spark Streaming. In fact Actually I have to scan HDFS to create the
 longFileList to begin with so get file status and therefore probably all
 the metadata needed by sc.textFile. Your method would save one scan, which
 is good.

 Might a better sc.textFile take a beginning URI, a file pattern regex, and
 a recursive flag? Then one scan could create all metadata automatically for
 a large subset of people using the function, something like

 sc.textFile(beginDir: String, filePattern: String = “^part.*”,
 recursive: Boolean = false)

 I fact it should be easy to create BetterSC that overrides the textFile
 method with a re-implementation that only requires one scan to get
 metadata.

 Just thinking on email…

 On Mar 14, 2015, at 11:11 AM, Michael Armbrust mich...@databricks.com
 wrote:


 Here is how I have dealt with many small text files (on s3 though this
 should generalize) in the past:

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E




 FromMichael Armbrust mich...@databricks.comSubjectRe:
 S3NativeFileSystem inefficient implementation when calling sc.textFile
 DateThu, 27 Nov 2014 03:20:14 GMT

 In the past I have worked around this problem by avoiding sc.textFile().
 Instead I read the data directly inside of a Spark job.  Basically, you
 start with an RDD where each entry is a file in S3 and then flatMap that
 with something that reads the files and returns the lines.

 Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe

 Using this class you can do something like:

 sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... ::
 Nil).flatMap(new ReadLinesSafe(_))

 You can also build up the list of files by running a Spark 
 job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653

 Michael


 On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com
 wrote:

 It’s a long story but there are many dirs with smallish part- files
 in them so we create a list of the individual files as input
 to sparkContext.textFile(fileList). I suppose we could move them and rename
 them to be contiguous part- files in one dir. Would that be better than
 passing in a long list of individual filenames? We could also make the part
 files much larger by collecting the smaller ones. But would any of this
 make a difference in IO speed?

 I ask because using the long file list seems to read, what amounts to a
 not very large data set rather slowly. If it were all in large part files
 in one dir I’d expect it to go much faster but this is just intuition.


 On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote:

 why can you not put them in a directory and read them as one input? you
 will get a task per file, but spark is very fast at executing many tasks
 (its not a jvm per task).

 On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com
 wrote:

 Any advice on dealing with a large number of separate input files?


 On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote:

 We have many text files that we need to read in parallel. We can create
 a comma delimited list of files to pass in to
 sparkContext.textFile(fileList). The list can get very large (maybe 1)
 and is all on hdfs.

 The question is: what is the most performant way to read them? Should
 they be broken up and read in groups appending the resulting RDDs or should
 we just pass in the entire list at once? In effect I’m asking if Spark does
 some optimization of whether we should do it explicitly. If the later, what
 rule might we use depending on our cluster setup?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org









MappedStream vs Transform API

2015-03-16 Thread madhu phatak
Hi,
  Current implementation of map function in spark streaming looks as below.

  def map[U: ClassTag](mapFunc: T = U): DStream[U] = {

  new MappedDStream(this, context.sparkContext.clean(mapFunc))
}

It creates an instance of MappedDStream which is a subclass of DStream.

The same function can be also implemented using transform API

def map[U: ClassTag](mapFunc: T = U): DStream[U] =

this.transform(rdd = {

  rdd.map(mapFunc)
})


Both implementation looks same. If they are same, is there any advantage
having a subclass of DStream?. Why can't we just use transform API?


Regards,
Madhukara Phatak
http://datamantra.io/


Re: Streaming: getting data from Cassandra based on input stream values

2015-01-23 Thread madhu phatak
Hi,
In that case, you can try the following.

val joinRDD = kafkaStream.transform( streamRDD = {

val ids = streamRDD.map(_._2).collect();

ids.map(userId =  ctable.select(user_name).where(userid = ?,
userId).toArray(0).get[String](0))

// better create a query which checks for all those ids at same time
})


On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko s...@dicefield.com wrote:

  Hi Madhu,
 Thanks for you response!
 But as I understand in this case you select all data from the Cassandra
 table. I don't wanna do it as it can be huge. I wanna just lookup some ids
 in the table. So it doesn't make sense for me how I can put some values
 from the streamRDD to the cassandra query (to where method).

 Greg



 On 1/23/15 1:11 AM, madhu phatak wrote:

 Hi,
 Seems like you want to get username for a give user id. You can use
 transform on the kafka stream to join two RDD's. The psuedo code looks like
 this

  val joinRDD = kafkaStream.transform( streamRDD = {

  streamRDD.map(value = (value._2,value._1)) join with
  (ctable.select(userid,username))

  })

 On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko s...@dicefield.com
 wrote:

  Hi there,

 I think I have a basic question, but I'm sort of stuck with figuring out
 how to approach it, and I thought someone could point me to the right
 direction.

 I'd like pull some data from Cassandra based on values received from an
 input stream. Something like

 val ctable = ssc.cassandraTable(keyspace, users)
 val userNames = kafkaStream.flatMap {
   case (key,userid) = {
 val userName = ctable.select(user_name).where(userid = ?,
 userId).toArray(0).get[String](0)
 Some(userId, userName)
   }
 }


 While the Cassandra query works in Spark shell, it throws an exception
 when I used it inside flatMap:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
 java.lang.NullPointerException:
 org.apache.spark.rdd.RDD.init(RDD.scala:125)

 com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:49)

 com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)

 com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)

 My understanding is that I cannot produce an RDD (Cassandra results)
 inside another RDD. But how should I approach the problem instead?



 Thanks,

 --
 Greg




  --
  Regards,
 Madhukara Phatak
 http://www.madhukaraphatak.com



 --
 Greg




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: save a histogram to a file

2015-01-23 Thread madhu phatak
Hi,
histogram method return normal scala types not a RDD. So you will not
have saveAsTextFile.
You can use makeRDD method make a rdd out of the data and saveAsObject file

val hist = a.histogram(10)
val histRDD = sc.makeRDD(hist)
histRDD.saveAsObjectFile(path)


On Fri, Jan 23, 2015 at 5:37 AM, SK skrishna...@gmail.com wrote:

 Hi,
 histogram() returns an object that is a  pair of Arrays. There appears to
 be
 no saveAsTextFile() for this paired object.

 Currently I am using the following to save the output to a file:

 val hist = a.histogram(10)

 val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1)
 val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2)

 Is there a simpler way to save the histogram() result to a file?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: DAG info

2015-01-03 Thread madhu phatak
Hi,
You can turn off these messages using log4j.properties.

On Fri, Jan 2, 2015 at 1:51 PM, Robineast robin.e...@xense.co.uk wrote:

 Do you have some example code of what you are trying to do?

 Robin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940p20941.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: broadcasting object issue

2014-12-22 Thread madhu phatak
Hi,
 Just ran your code on spark-shell.  If you replace

 val bcA = sc.broadcast(a)


with

val bcA = sc.broadcast(new B().getA)


it seems to work. Not sure why.


On Tue, Dec 23, 2014 at 9:12 AM, Henry Hung ythu...@winbond.com wrote:

  Hi All,



 I have a problem with broadcasting a serialize class object that returned
 by another not-serialize class, here is the sample code:



 class A extends java.io.Serializable {

 def halo(): String = halo

 }



 class B {

 def getA() = new A

 }



 val list = List(1)



 val b = new B

 val a = b.getA



 val p = sc.parallelize(list)



 // this will fail

 val bcA = sc.broadcast(a)

 p.map(x = {

 bcA.value.halo()

 })



 // this will success

 val bcA = sc.broadcast(new A)

 p.map(x = {

 bcA.value.halo()

 })





 A is a serializable class, where B is not-serialize.

 If I create a new object A through B method getA(), the map process will
 failed with exception “org.apache.spark.SparkException: Task not
 serializable, Caused by: java.io.NotSerializableException: $iwC$$iwC$B”



 I don’t know why spark will check if the B class serializable or not, is
 there a way to code this?



 Best regards,

 Henry

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: Joins in Spark

2014-12-22 Thread madhu phatak
Hi,
 You can map your vertices rdd as follow

val pairVertices = verticesRDD.map(vertice = (vertice,null))

the above gives you a pairRDD. After join make sure that you remove
superfluous null value.

On Tue, Dec 23, 2014 at 10:36 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have two RDDs, vertices and edges. Vertices is an RDD and edges is a
 pair RDD. I want to take three way join of these two. Joins work only when
 both the RDDs are pair RDDS right? So, how am I supposed to take a three
 way join of these RDDs?

 Thank You




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: reading files recursively using spark

2014-12-19 Thread madhu phatak
Hi,
You can use FileInputformat API of Hadoop and newApiHadoopFile of spark to
get recursion. More on the topic you can refer here
http://stackoverflow.com/questions/8114579/using-fileinputformat-addinputpaths-to-recursively-add-hdfs-path

On Fri, Dec 19, 2014 at 4:50 PM, Sean Owen so...@cloudera.com wrote:

 How about using the HDFS API to create a list of all the directories
 to read from, and passing them as a comma-joined string to
 sc.textFile?

 On Fri, Dec 19, 2014 at 11:13 AM, Hafiz Mujadid
 hafizmujadi...@gmail.com wrote:
  Hi experts!
 
  what is efficient way to read all files using spark from directory and
 its
  sub-directories as well.currently i move all files from directory and it
  sub-directories into another temporary directory and then read them all
  using sc.textFile method. But I want a method so that moving to temporary
  directory cost may be saved.
 
  Thanks
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/reading-files-recursively-using-spark-tp20782.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: SchemaRDD.sample problem

2014-12-18 Thread madhu phatak
Hi,
Can you clean up the code lil bit better, it's hard to read what's going
on. You can use pastebin or gist to put the code.

On Wed, Dec 17, 2014 at 3:58 PM, Hao Ren inv...@gmail.com wrote:

 Hi,

 I am using SparkSQL on 1.2.1 branch. The problem comes froms the following
 4-line code:

 *val t1: SchemaRDD = hiveContext hql select * from product where is_new =
 0
 val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05)
 tb1.registerTempTable(t1_tmp)
 (hiveContext sql select count(*) from t1_tmp where is_new = 1) collect
 foreach println*

 We know that *t1* contains only rows whose is_new field is zero.
 After sampling t1 by taking 5% rows, normally, the sampled table should
 always contains only rows where is_new = 0. However, line 4 gives a
 number
 about 5 by chance. That means there are some rows where is_new = 1 in the
 sampled table, which is not logically possible.

 I am not sure SchemaRDD.sample is doing his work well.

 Any idea ?

 Hao



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: When will spark 1.2 released?

2014-12-18 Thread madhu phatak
It’s on Maven Central already http://search.maven.org/#browse%7C717101892

On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com 
vboylin1...@gmail.com wrote:

 Hi,
Dose any know when will spark 1.2 released? 1.2 has many great feature
 that we can't wait now ,-)

 Sincely
 Lin wukang


 发自网易邮箱大师



-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com