Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?

2015-12-22 Thread Benjamin Kim
Hi Stephen,

I forgot to mention that I added these lines below to the spark-default.conf on 
the node with Spark SQL Thrift JDBC/ODBC Server running on it. Then, I 
restarted it.

spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar

I read in another thread that this would work. I was able to create the table 
and could see it in my SHOW TABLES list. But, when I try to query the table, I 
get the same error. It looks like I’m getting close.

Are there any other things that I have to do that you can think of?

Thanks,
Ben


> On Dec 22, 2015, at 6:25 PM, Stephen Boesch  wrote:
> 
> The postgres jdbc driver needs to be added to the  classpath of your spark 
> workers.  You can do a search for how to do that (multiple ways).
> 
> 2015-12-22 17:22 GMT-08:00 b2k70  >:
> I see in the Spark SQL documentation that a temporary table can be created
> directly onto a remote PostgreSQL table.
> 
> CREATE TEMPORARY TABLE 
> USING org.apache.spark.sql.jdbc
> OPTIONS (
> url "jdbc:postgresql:///",
> dbtable "impressions"
> );
> When I run this against our PostgreSQL server, I get the following error.
> 
> Error: java.sql.SQLException: No suitable driver found for
> jdbc:postgresql:/// (state=,code=0)
> 
> Can someone help me understand why this is?
> 
> Thanks, Ben
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-5-2-missing-JDBC-driver-for-PostgreSQL-tp25773.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?

2015-12-22 Thread Benjamin Kim
Stephen,

Let me confirm. I just need to propagate these settings I put in
spark-defaults.conf to all the worker nodes? Do I need to do the same with
the PostgreSQL driver jar file too? If so, is there a way to have it read
from HDFS rather than copying out to the cluster manually.

Thanks for your help,
Ben

On Tuesday, December 22, 2015, Stephen Boesch <java...@gmail.com> wrote:

> HI Benjamin,  yes by adding to the thrift server then the create table
> would work.  But querying is performed by the workers: so you need to add
> to the classpath of all nodes for reads to work.
>
> 2015-12-22 18:35 GMT-08:00 Benjamin Kim <bbuil...@gmail.com
> <javascript:_e(%7B%7D,'cvml','bbuil...@gmail.com');>>:
>
>> Hi Stephen,
>>
>> I forgot to mention that I added these lines below to the
>> spark-default.conf on the node with Spark SQL Thrift JDBC/ODBC Server
>> running on it. Then, I restarted it.
>>
>> spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
>>
>> spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
>>
>> I read in another thread that this would work. I was able to create the
>> table and could see it in my SHOW TABLES list. But, when I try to query the
>> table, I get the same error. It looks like I’m getting close.
>>
>> Are there any other things that I have to do that you can think of?
>>
>> Thanks,
>> Ben
>>
>>
>> On Dec 22, 2015, at 6:25 PM, Stephen Boesch <java...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','java...@gmail.com');>> wrote:
>>
>> The postgres jdbc driver needs to be added to the  classpath of your
>> spark workers.  You can do a search for how to do that (multiple ways).
>>
>> 2015-12-22 17:22 GMT-08:00 b2k70 <bbuil...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','bbuil...@gmail.com');>>:
>>
>>> I see in the Spark SQL documentation that a temporary table can be
>>> created
>>> directly onto a remote PostgreSQL table.
>>>
>>> CREATE TEMPORARY TABLE 
>>> USING org.apache.spark.sql.jdbc
>>> OPTIONS (
>>> url "jdbc:postgresql:///",
>>> dbtable "impressions"
>>> );
>>> When I run this against our PostgreSQL server, I get the following error.
>>>
>>> Error: java.sql.SQLException: No suitable driver found for
>>> jdbc:postgresql:///
>>> (state=,code=0)
>>>
>>> Can someone help me understand why this is?
>>>
>>> Thanks, Ben
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-5-2-missing-JDBC-driver-for-PostgreSQL-tp25773.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> <http://nabble.com>.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> <javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org');>
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> <javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');>
>>>
>>>
>>
>>
>


Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?

2015-12-25 Thread Benjamin Kim
Hi Chris,

I did what you did. It works for me now! Thanks for your help.

Have a Merry Christmas!

Cheers,
Ben


> On Dec 25, 2015, at 6:41 AM, Chris Fregly <ch...@fregly.com> wrote:
> 
> Configuring JDBC drivers with Spark is a bit tricky as the JDBC driver needs 
> to be on the Java System Classpath per this 
> <http://spark.apache.org/docs/latest/sql-programming-guide.html#troubleshooting>
>  troubleshooting section in the Spark SQL programming guide.
> 
> Here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/bin/start-hive-thriftserver.sh>
>  is an example hive-thrift-server start script from my Spark-based reference 
> pipeline project.  Here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/bin/pipeline-spark-sql.sh>
>  is an example script that decorates the out-of-the-box spark-sql command to 
> use the MySQL JDBC driver.
> 
> These scripts explicitly set --jars to $SPARK_SUBMIT_JARS which is defined 
> here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L144>
>  and here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L87>
>  and includes the path to the local MySQL JDBC driver.  This approach is 
> described here 
> <http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management>
>  in the Spark docs that describe the advanced spark-submit options.  
> 
> Any jar specified with --jars will be passed to each worker node in the 
> cluster - specifically in the work directory for each SparkContext for 
> isolation purposes.
> 
> Cleanup of these jars on the worker nodes is handled by YARN automatically, 
> and by Spark Standalone per the spark.worker.cleanup.appDataTtl config param.
> 
> The Spark SQL programming guide says to use SPARK_CLASSPATH for this purpose, 
> but I couldn't get this to work for whatever reason, so i'm sticking to the 
> --jars approach used in my examples.
> 
> On Tue, Dec 22, 2015 at 9:51 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Stephen,
> 
> Let me confirm. I just need to propagate these settings I put in 
> spark-defaults.conf to all the worker nodes? Do I need to do the same with 
> the PostgreSQL driver jar file too? If so, is there a way to have it read 
> from HDFS rather than copying out to the cluster manually. 
> 
> Thanks for your help,
> Ben
> 
> 
> On Tuesday, December 22, 2015, Stephen Boesch <java...@gmail.com 
> <mailto:java...@gmail.com>> wrote:
> HI Benjamin,  yes by adding to the thrift server then the create table would 
> work.  But querying is performed by the workers: so you need to add to the 
> classpath of all nodes for reads to work.
> 
> 2015-12-22 18:35 GMT-08:00 Benjamin Kim <bbuil...@gmail.com <>>:
> Hi Stephen,
> 
> I forgot to mention that I added these lines below to the spark-default.conf 
> on the node with Spark SQL Thrift JDBC/ODBC Server running on it. Then, I 
> restarted it.
> 
> spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
> spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
> 
> I read in another thread that this would work. I was able to create the table 
> and could see it in my SHOW TABLES list. But, when I try to query the table, 
> I get the same error. It looks like I’m getting close.
> 
> Are there any other things that I have to do that you can think of?
> 
> Thanks,
> Ben
> 
> 
>> On Dec 22, 2015, at 6:25 PM, Stephen Boesch <java...@gmail.com <>> wrote:
>> 
>> The postgres jdbc driver needs to be added to the  classpath of your spark 
>> workers.  You can do a search for how to do that (multiple ways).
>> 
>> 2015-12-22 17:22 GMT-08:00 b2k70 <bbuil...@gmail.com <>>:
>> I see in the Spark SQL documentation that a temporary table can be created
>> directly onto a remote PostgreSQL table.
>> 
>> CREATE TEMPORARY TABLE 
>> USING org.apache.spark.sql.jdbc
>> OPTIONS (
>> url "jdbc:postgresql:///",
>> dbtable "impressions"
>> );
>> When I run this against our PostgreSQL server, I get the following error.
>> 
>> Error: java.sql.SQLException: No suitable driver found for
>> jdbc:postgresql:/// (state=,code=0)
>> 
>> Can someone help me understand why this is?
>> 
>> Thanks, Ben
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-5-2-missing-JDBC-driver-for-PostgreSQL-tp25773.html
>>  
>> <http://apache-spark-user-list.1001560.n3.nab

Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?

2015-12-26 Thread Benjamin Kim
Chris,

I have a question about your setup. Does it allow the same usage of 
Cassandra/HBase data sources? Can I create a table that links to and be used by 
Spark SQL? The reason for asking is that I see the Cassandra connector package 
included in your script.

Thanks,
Ben

> On Dec 25, 2015, at 6:41 AM, Chris Fregly <ch...@fregly.com> wrote:
> 
> Configuring JDBC drivers with Spark is a bit tricky as the JDBC driver needs 
> to be on the Java System Classpath per this 
> <http://spark.apache.org/docs/latest/sql-programming-guide.html#troubleshooting>
>  troubleshooting section in the Spark SQL programming guide.
> 
> Here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/bin/start-hive-thriftserver.sh>
>  is an example hive-thrift-server start script from my Spark-based reference 
> pipeline project.  Here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/bin/pipeline-spark-sql.sh>
>  is an example script that decorates the out-of-the-box spark-sql command to 
> use the MySQL JDBC driver.
> 
> These scripts explicitly set --jars to $SPARK_SUBMIT_JARS which is defined 
> here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L144>
>  and here 
> <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L87>
>  and includes the path to the local MySQL JDBC driver.  This approach is 
> described here 
> <http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management>
>  in the Spark docs that describe the advanced spark-submit options.  
> 
> Any jar specified with --jars will be passed to each worker node in the 
> cluster - specifically in the work directory for each SparkContext for 
> isolation purposes.
> 
> Cleanup of these jars on the worker nodes is handled by YARN automatically, 
> and by Spark Standalone per the spark.worker.cleanup.appDataTtl config param.
> 
> The Spark SQL programming guide says to use SPARK_CLASSPATH for this purpose, 
> but I couldn't get this to work for whatever reason, so i'm sticking to the 
> --jars approach used in my examples.
> 
> On Tue, Dec 22, 2015 at 9:51 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Stephen,
> 
> Let me confirm. I just need to propagate these settings I put in 
> spark-defaults.conf to all the worker nodes? Do I need to do the same with 
> the PostgreSQL driver jar file too? If so, is there a way to have it read 
> from HDFS rather than copying out to the cluster manually. 
> 
> Thanks for your help,
> Ben
> 
> 
> On Tuesday, December 22, 2015, Stephen Boesch <java...@gmail.com 
> <mailto:java...@gmail.com>> wrote:
> HI Benjamin,  yes by adding to the thrift server then the create table would 
> work.  But querying is performed by the workers: so you need to add to the 
> classpath of all nodes for reads to work.
> 
> 2015-12-22 18:35 GMT-08:00 Benjamin Kim <bbuil...@gmail.com <>>:
> Hi Stephen,
> 
> I forgot to mention that I added these lines below to the spark-default.conf 
> on the node with Spark SQL Thrift JDBC/ODBC Server running on it. Then, I 
> restarted it.
> 
> spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
> spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
> 
> I read in another thread that this would work. I was able to create the table 
> and could see it in my SHOW TABLES list. But, when I try to query the table, 
> I get the same error. It looks like I’m getting close.
> 
> Are there any other things that I have to do that you can think of?
> 
> Thanks,
> Ben
> 
> 
>> On Dec 22, 2015, at 6:25 PM, Stephen Boesch <java...@gmail.com <>> wrote:
>> 
>> The postgres jdbc driver needs to be added to the  classpath of your spark 
>> workers.  You can do a search for how to do that (multiple ways).
>> 
>> 2015-12-22 17:22 GMT-08:00 b2k70 <bbuil...@gmail.com <>>:
>> I see in the Spark SQL documentation that a temporary table can be created
>> directly onto a remote PostgreSQL table.
>> 
>> CREATE TEMPORARY TABLE 
>> USING org.apache.spark.sql.jdbc
>> OPTIONS (
>> url "jdbc:postgresql:///",
>> dbtable "impressions"
>> );
>> When I run this against our PostgreSQL server, I get the following error.
>> 
>> Error: java.sql.SQLException: No suitable driver found for
>> jdbc:postgresql:/// (state=,code=0)
>> 
>> Can someone help me understand why this is?
>> 
>> Thanks, Ben
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
 '',   

   `event_id` string COMMENT '',

   `impression_id` string COMMENT '',   

   `diagnostic_data` string COMMENT '', 

   `user_profile_mapping_source` string COMMENT '', 

   `latitude` float COMMENT '', 

   `longitude` float COMMENT '',

   `area_code` int COMMENT '',  

   `gmt_offset` string COMMENT '',  

   `in_dst` string COMMENT '',  

   `proxy_type` string COMMENT '',  

   `mobile_carrier` string COMMENT '',  

   `pop` string COMMENT '', 

   `hostname` string COMMENT '',

   `profile_ttl` string COMMENT '', 

   `timestamp_iso` string COMMENT '',   

   `reference_id` string COMMENT '',

   `identity_organization` string COMMENT '',   

   `identity_method` string COMMENT '', 

   `mappable_id` string COMMENT '', 

   `profile_expires` string COMMENT '', 

   `video_player_iframed` int COMMENT '',   

   `video_player_in_view` int COMMENT '',   

   `video_player_width` int COMMENT '', 

   `video_player_height` int COMMENT '',

   `host_domain` string COMMENT '', 

   `browser_type` string COMMENT '',

   `browser_device_cat` string COMMENT '',  

   `browser_family` string COMMENT '',  

   `browser_name` string COMMENT '',

   `browser_version` string COMMENT '', 

   `browser_major_version` string COMMENT '',   

   `browser_minor_version` string COMMENT '',   

   `os_family` string COMMENT '',   

   `os_name` string COMMENT '', 

   `os_version` string COMMENT '',  

   `os_major_version` string COMMENT '',

   `os_minor_version` string COMMENT '')

 PARTITIONED BY (`dt` timestamp)
 
 STORED AS PARQUET;

Thanks,
Ben


> On Jun 3, 2016, at 8:47 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> hang on are you saving this as a new table?
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 14:13, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone know how to save data in a DataFrame to a table partitioned using 
> an existing column reformatted into a derived column?
> 
> val partitionedDf = df.withColumn("dt", 
> concat(substring($"timestamp", 1, 10), lit(" "), substring($"timestamp", 12, 
> 2), lit(":00")))
> 
> sqlContext.setConf("hive.exec.dynamic.partition", "true")
> sqlContext.setConf("hive.exec.dynamic.partition.mode", 
> "nonstrict")
> partitionedDf.write
> .mode(SaveMode.Append)
> .partitionBy("dt")
> .saveAsTable("ds.amo_bi_events")
> 
> I am getting an ArrayOutOfBounds error. There are 83 columns in the 
> destination table. But after adding the derived column, then I get an 84 
> error. I assumed that the column used for the partition would not be counted.
> 
> Can someone ple

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Mich,

I am using .withColumn to add another column “dt” that is a reformatted version 
of an existing column “timestamp”. The partitioned by column is “dt”.

We are using Spark 1.6.0 in CDH 5.7.0.

Thanks,
Ben

> On Jun 3, 2016, at 10:33 AM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> what version of spark are you using
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 17:51, Mich Talebzadeh <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> wrote:
> ok what is the new column is called? you are basically adding a new column to 
> an already existing table
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 17:04, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> The table already exists.
> 
>  CREATE EXTERNAL TABLE `amo_bi_events`(   
>`event_type` string COMMENT '',
>   
>`timestamp` string COMMENT '', 
>   
>`event_valid` int COMMENT '',  
>   
>`event_subtype` string COMMENT '', 
>   
>`user_ip` string COMMENT '',   
>   
>`user_id` string COMMENT '',   
>   
>`cookie_status` string COMMENT '', 
>   
>`profile_status` string COMMENT '',
>   
>`user_status` string COMMENT '',   
>   
>`previous_timestamp` string COMMENT '',
>   
>`user_agent` string COMMENT '',
>   
>`referer` string COMMENT '',   
>   
>`uri` string COMMENT '',   
>   
>`request_elapsed` bigint COMMENT '',   
>   
>`browser_languages` string COMMENT '', 
>   
>`acamp_id` int COMMENT '', 
>   
>`creative_id` int COMMENT '',  
>   
>`location_id` int COMMENT '',  
>   
>`pcamp_id` int COMMENT '', 
>   
>`pdomain_id` int COMMENT '',   
>   
>`country` string COMMENT '',   
>   
>`region` string COMMENT '',
>   
>`dma` int COMMENT '',  
>   
>`city` string COMMENT '',  
>   
>`zip` string COMMENT '',   
>   
>`isp` string COMMENT '',   
>   
>`line_speed` string COMMENT '',
>   
>`gender` string COMMENT '',
>   
>`year_of_birth` int COMMENT '',
>   
>`behaviors_read` string COMMENT '',
>   
>`behaviors_written` string COMMENT '', 
>   
>`key_value_pairs` string COMMENT '',   
>   
>`acamp_candidates` int COMMENT '', 
>   
>`tag_format` string COMMENT '',
>   
>`optimizer_name` string COMMENT '',
>   
>`optimizer_version` string COMMENT '', 
>   
>

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
e  string
> timestamp   string
> event_valid int
> event_subtype   string
> user_ip string
> user_id string
> cookie_status   string
> profile_status  string
> user_status string
> previous_timestamp  string
> user_agent  string
> referer string
> uri string
> request_elapsed bigint
> browser_languages   string
> acamp_idint
> creative_id int
> location_id int
> pcamp_idint
> pdomain_id  int
> country string
> region  string
> dma int
> citystring
> zip string
> isp string
> line_speed  string
> gender  string
> year_of_birth   int
> behaviors_read  string
> behaviors_written   string
> key_value_pairs string
> acamp_candidatesint
> tag_format  string
> optimizer_name  string
> optimizer_version   string
> optimizer_ipstring
> pixel_idint
> video_idstring
> video_network_idint
> video_time_watched  bigint
> video_percentage_watchedint
> conversion_valid_sale   int
> conversion_sale_amount  float
> conversion_commission_amountfloat
> conversion_step int
> conversion_currency string
> conversion_attribution  int
> conversion_offer_id string
> custom_info string
> frequency   int
> recency_seconds int
> costfloat
> revenue float
> optimizer_acamp_id  int
> optimizer_creative_id   int
> optimizer_ecpm  float
> event_idstring
> impression_id   string
> diagnostic_data string
> user_profile_mapping_source string
> latitudefloat
> longitude   float
> area_code   int
> gmt_offset  string
> in_dst  string
> proxy_type  string
> mobile_carrier  string
> pop string
> hostnamestring
> profile_ttl string
> timestamp_iso   string
> reference_idstring
> identity_organization   string
> identity_method string
> mappable_id string
> profile_expires string
> video_player_iframedint
> video_player_in_viewint
> video_player_width  int
> video_player_height int
> host_domain string
> browser_typestring
> browser_device_cat  string
> browser_family  string
> browser_namestring
> browser_version string
> browser_major_version   string
> browser_minor_version   string
> os_family   string
> os_name string
> os_version  string
> os_major_versionstring
> os_minor_versionstring
> # Partition Information
> # col_name  data_type   comment
> dt  timestamp
> # Detailed Table Information
> Database:   test
> Owner:  hduser
> CreateTime: Fri Jun 03 19:03:20 BST 2016
> LastAccessTime: UNKNOWN
> Retention:  0
> Location:   
> hdfs://rhes564:9000/user/hive/warehouse/test.db/amo_bi_events
> Table Type: EXTERNAL_TABLE
> Table Parameters:
> EXTERNALTRUE
> transient_lastDdlTime   1464977000
> # Storage Information
> SerDe Library:  
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
> InputFormat:
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
> OutputFormat:   
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
> Compressed: No
> Num Buckets:-1
> Bucket Columns: []
> Sort Columns:   []
> Storage Desc Params:
> serialization.format1
> Time taken: 0.397 seconds, Fetched: 124 row(s)
> 
> So effectively that table is partitioned by dt in notime
> 
> Now what I don't understand whether that table is already partitioned as you 
> said the table already exists!
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://t

Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Does anyone know how to save data in a DataFrame to a table partitioned using 
an existing column reformatted into a derived column?

val partitionedDf = df.withColumn("dt", 
concat(substring($"timestamp", 1, 10), lit(" "), substring($"timestamp", 12, 
2), lit(":00")))

sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
partitionedDf.write
.mode(SaveMode.Append)
.partitionBy("dt")
.saveAsTable("ds.amo_bi_events")

I am getting an ArrayOutOfBounds error. There are 83 columns in the destination 
table. But after adding the derived column, then I get an 84 error. I assumed 
that the column used for the partition would not be counted.

Can someone please help.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Data Integrity / Model Quality Monitoring

2016-06-17 Thread Benjamin Kim
Has anyone run into this requirement?

We have a need to track data integrity and model quality metrics of outcomes so 
that we can both gauge if the data is healthy coming in and the models run 
against them are still performing and not giving faulty results. A nice to have 
would be to graph these over time somehow. Since we are using Cloudera Manager, 
graphing in there would be a plus.

Any advice or suggestions would be welcome.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Model Quality Tracking

2016-06-24 Thread Benjamin Kim
Has anyone implemented a way to track the performance of a data model? We 
currently have an algorithm to do record linkage and spit out statistics of 
matches, non-matches, and/or partial matches with reason codes of why we didn’t 
match accurately. In this way, we will know if something goes wrong down the 
line. All of this goes into a csv file directories partitioned by datetime with 
a hive table on top. Then, we can do analytical queries and even charting if 
need be. All of this is very manual, but I was wondering if there is a package, 
software, built-in module, etc. that would do this automatically. Since we are 
using CDH, it would be great if these graphs could be integrated into Cloudera 
Manager too.

Any advice is welcome.

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.6.0 connect to hive metastore

2016-02-09 Thread Benjamin Kim
I got the same problem when I added the Phoenix plugin jar in the driver and 
executor extra classpaths. Do you have those set too?

> On Feb 9, 2016, at 1:12 PM, Koert Kuipers  wrote:
> 
> yes its not using derby i think: i can see the tables in my actual hive 
> metastore.
> 
> i was using a symlink to /etc/hive/conf/hive-site.xml for my hive-site.xml 
> which has a lot more stuff than just hive.metastore.uris
> 
> let me try your approach
> 
> 
> 
> On Tue, Feb 9, 2016 at 3:57 PM, Alexandr Dzhagriev  > wrote:
> I'm using spark 1.6.0, hive 1.2.1 and there is just one property in the 
> hive-site.xml hive.metastore.uris Works for me. Can you check in the logs, 
> that when the HiveContext is created it connects to the correct uri and 
> doesn't use derby.
> 
> Cheers, Alex.
> 
> On Tue, Feb 9, 2016 at 9:39 PM, Koert Kuipers  > wrote:
> hey thanks. hive-site is on classpath in conf directory
> 
> i currently got it to work by changing this hive setting in hive-site.xml:
> hive.metastore.schema.verification=true
> to
> hive.metastore.schema.verification=false
> 
> this feels like a hack, because schema verification is a good thing i would 
> assume?
> 
> On Tue, Feb 9, 2016 at 3:25 PM, Alexandr Dzhagriev  > wrote:
> Hi Koert,
> 
> As far as I can see you are using derby:
> 
>  Using direct SQL, underlying DB is DERBY
> 
> not mysql, which is used for the metastore. That means, spark couldn't find 
> hive-site.xml on your classpath. Can you check that, please?
> 
> Thanks, Alex.
> 
> On Tue, Feb 9, 2016 at 8:58 PM, Koert Kuipers  > wrote:
> has anyone successfully connected to hive metastore using spark 1.6.0? i am 
> having no luck. worked fine with spark 1.5.1 for me. i am on cdh 5.5 and 
> launching spark with yarn.
> 
> this is what i see in logs:
> 16/02/09 14:49:12 INFO hive.metastore: Trying to connect to metastore with 
> URI thrift://metastore.mycompany.com:9083 
> 
> 16/02/09 14:49:12 INFO hive.metastore: Connected to metastore.
> 
> and then a little later:
> 
> 16/02/09 14:49:34 INFO hive.HiveContext: Initializing execution hive, version 
> 1.2.1
> 16/02/09 14:49:34 INFO client.ClientWrapper: Inspected Hadoop version: 
> 2.6.0-cdh5.4.4
> 16/02/09 14:49:34 INFO client.ClientWrapper: Loaded 
> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.4.4
> 16/02/09 14:49:34 WARN conf.HiveConf: HiveConf of name 
> hive.server2.enable.impersonation does not exist
> 16/02/09 14:49:35 INFO metastore.HiveMetaStore: 0: Opening raw store with 
> implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
> 16/02/09 14:49:35 INFO metastore.ObjectStore: ObjectStore, initialize called
> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property 
> hive.metastore.integral.jdo.pushdown unknown - will be ignored
> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property 
> datanucleus.cache.level2 unknown - will be ignored
> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not 
> present in CLASSPATH (or one of dependencies)
> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not 
> present in CLASSPATH (or one of dependencies)
> 16/02/09 14:49:37 WARN conf.HiveConf: HiveConf of name 
> hive.server2.enable.impersonation does not exist
> 16/02/09 14:49:37 INFO metastore.ObjectStore: Setting MetaStore object pin 
> classes with 
> hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 16/02/09 14:49:40 INFO metastore.MetaStoreDirectSql: Using direct SQL, 
> underlying DB is DERBY
> 16/02/09 14:49:40 INFO metastore.ObjectStore: Initialized ObjectStore
> java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>   at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>   at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:194)
>   at 
> 

Re: Is there a any plan to develop SPARK with c++??

2016-02-03 Thread Benjamin Kim
Hi DaeJin,

The closest thing I can think of is this.

https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

Cheers,
Ben

> On Feb 3, 2016, at 9:49 PM, DaeJin Jung  wrote:
> 
> hello everyone,
> I have a short question.
> 
> I would like to improve performance for SPARK framework using intel native 
> instruction or etc.. So, I wonder if there are any plans to develop SPARK 
> with C++ or C in the near future.
> 
> Please let me know if you have any informantion.
> 
> Best Regards,
> Daejin Jung



Re: [ANNOUNCE] New SAMBA Package = Spark + AWS Lambda

2016-02-02 Thread Benjamin Kim
Hi David,

My company uses Lamba to do simple data moving and processing using python 
scripts. I can see using Spark instead for the data processing would make it 
into a real production level platform. Does this pave the way into replacing 
the need of a pre-instantiated cluster in AWS or bought hardware in a 
datacenter? If so, then this would be a great efficiency and make an easier 
entry point for Spark usage. I hope the vision is to get rid of all cluster 
management when using Spark.

Thanks,
Ben


> On Feb 1, 2016, at 4:23 AM, David Russell  wrote:
> 
> Hi all,
> 
> Just sharing news of the release of a newly available Spark package, SAMBA 
> . 
> 
> 
> https://github.com/onetapbeyond/lambda-spark-executor 
> 
> 
> SAMBA is an Apache Spark package offering seamless integration with the AWS 
> Lambda  compute service for Spark batch and 
> streaming applications on the JVM.
> 
> Within traditional Spark deployments RDD tasks are executed using fixed 
> compute resources on worker nodes within the Spark cluster. With SAMBA, 
> application developers can delegate selected RDD tasks to execute using 
> on-demand AWS Lambda compute infrastructure in the cloud.
> 
> Not unlike the recently released ROSE 
>  package that extends 
> the capabilities of traditional Spark applications with support for CRAN R 
> analytics, SAMBA provides another (hopefully) useful extension for Spark 
> application developers on the JVM.
> 
> SAMBA Spark Package: https://github.com/onetapbeyond/lambda-spark-executor 
> 
> ROSE Spark Package: https://github.com/onetapbeyond/opencpu-spark-executor 
> 
> 
> Questions, suggestions, feedback welcome.
> 
> David
> 
> -- 
> "All that is gold does not glitter, Not all those who wander are lost."



Re: Spark with SAS

2016-02-03 Thread Benjamin Kim
You can download the Spark ODBC Driver.

https://databricks.com/spark/odbc-driver-download


> On Feb 3, 2016, at 10:09 AM, Jörn Franke  wrote:
> 
> This could be done through odbc. Keep in mind that you can run SaS jobs 
> directly on a Hadoop cluster using the SaS embedded process engine or dump 
> some data to SaS lasr cluster, but you better ask SaS about this.
> 
>> On 03 Feb 2016, at 18:43, Sourav Mazumder  
>> wrote:
>> 
>> Hi,
>> 
>> Is anyone aware of any work going on for integrating Spark with SAS for 
>> executing queries in Spark?
>> 
>> For example calling Spark Jobs from SAS using Spark SQL through Spark SQL's 
>> JDBC/ODBC library.
>> 
>> Regards,
>> Sourav
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkOnHBase : Which version of Spark its available

2016-02-17 Thread Benjamin Kim
Ted,

Any idea as to when this will be released?

Thanks,
Ben


> On Feb 17, 2016, at 2:53 PM, Ted Yu  wrote:
> 
> The HBASE JIRA below is for HBase 2.0
> 
> HBase Spark module would be back ported to hbase 1.3.0
> 
> FYI 
> 
> On Feb 17, 2016, at 1:13 PM, Chandeep Singh  > wrote:
> 
>> HBase-Spark module was added in 1.3
>> 
>> https://issues.apache.org/jira/browse/HBASE-13992 
>> 
>> 
>> http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
>>  
>> 
>> http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ 
>> 
>> 
>>> On Feb 17, 2016, at 9:44 AM, Divya Gehlot >> > wrote:
>>> 
>>> Hi,
>>> 
>>> SparkonHBase is integrated with which version of Spark and HBase ?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Thanks,
>>> Divya 
>> 



Re: S3 Zip File Loading Advice

2016-03-15 Thread Benjamin Kim
Hi Xinh,

I tried to wrap it, but it still didn’t work. I got a 
"java.util.ConcurrentModificationException”.

All,

I have been trying and trying with some help of a coworker, but it’s slow 
going. I have been able to gather a list of the s3 files I need to download.

### S3 Lists ###
import scala.collection.JavaConverters._
import java.util.ArrayList
import java.util.zip.{ZipEntry, ZipInputStream}
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ObjectListing, S3ObjectSummary, 
ListObjectsRequest, GetObjectRequest}
import org.apache.commons.io.IOUtils
import org.joda.time.{DateTime, Period}
import org.joda.time.format.DateTimeFormat

val s3Bucket = "amg-events"

val formatter = DateTimeFormat.forPattern("/MM/dd/HH")
var then = DateTime.now()

var files = new ArrayList[String]

//S3 Client and List Object Request
val s3Client = new AmazonS3Client()
val listObjectsRequest = new ListObjectsRequest()
var objectListing: ObjectListing = null

//Your S3 Bucket
listObjectsRequest.setBucketName(s3Bucket)

var now = DateTime.now()
var range = 
Iterator.iterate(now.minusDays(1))(_.plus(Period.hours(1))).takeWhile(!_.isAfter(now))
range.foreach(ymdh => {
  //Your Folder path or Prefix
  listObjectsRequest.setPrefix(formatter.print(ymdh))

  //Adding s3:// to the paths and adding to a list
  do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
  if (objectSummary.getKey().contains(".csv.zip") && 
objectSummary.getLastModified().after(then.toDate())) {
//files.add(objectSummary.getKey())
files.add("s3n://" + s3Bucket + "/" + objectSummary.getKey())
  }
}
listObjectsRequest.setMarker(objectListing.getNextMarker())
  } while (objectListing.isTruncated())
})
then = now

//Creating a Scala List for same
val fileList = files.asScala

//Parallelize the Scala List
val fileRDD = sc.parallelize(fileList)

Now, I am trying to go through the list and download each file, unzip each file 
as it comes, and pass the ZipInputStream to the CSV parser. This is where I get 
stuck.

var df: DataFrame = null
for (file <- fileList) {
  val zipfile = s3Client.getObject(new GetObjectRequest(s3Bucket, 
file)).getObjectContent()
  val zis = new ZipInputStream(zipfile)
  var ze = zis.getNextEntry()
//  val fileDf = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").option("inferSchema", "true").load(zis)
//  if (df != null) {
//df = df.unionAll(fileDf)
//  } else {
//df = fileDf
//  }
}

I don’t know if I am doing it right or not. I also read that parallelizing 
fileList would allow parallel file retrieval. But, I don’t know how to proceed 
from here.

If you can help, I would be grateful.

Thanks,
Ben


> On Mar 9, 2016, at 10:10 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
> 
> Could you wrap the ZipInputStream in a List, since a subtype of 
> TraversableOnce[?] is required?
> 
> case (name, content) => List(new ZipInputStream(content.open))
> 
> Xinh
> 
> On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Sabarish,
> 
> I found a similar posting online where I should use the S3 listKeys. 
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd
>  
> <http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd>.
>  Is this what you were thinking?
> 
> And, your assumption is correct. The zipped CSV file contains only a single 
> file. I found this posting. 
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark 
> <http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark>. I 
> see how to do the unzipping, but I cannot get it to work when running the 
> code directly.
> 
> ...
> import java.io <http://java.io/>.{ IOException, FileOutputStream, 
> FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
> 
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
> 
> val zipFile = 
> "s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
>  <>"
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, 
> content: PortableDataStream) => new ZipInputStream(content.open) }
> 
> :95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
&

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

That’s great! I didn’t know. I will proceed with it as you said.

Thanks,
Ben

> On Mar 13, 2016, at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
> 
> Then you would be able to build the module yourself.
> 
> hbase-spark module uses APIs which are compatible with hbase 1.0
> 
> Cheers
> 
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Ted,
> 
> I see that you’re working on the hbase-spark module for hbase. I recently 
> packaged the SparkOnHBase project and gave it a test run. It works like a 
> charm on CDH 5.4 and 5.5. All I had to do was add 
> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
> HBaseContext for HBase operations. Now, I want to use the latest in 
> Dataframes. Since the new functionality is only in the hbase-spark module, I 
> want to know how to get it and package it for CDH 5.5, which still uses HBase 
> 1.0.0. Can you tell me what version of hbase master is still backwards 
> compatible?
> 
> By the way, we are using Spark 1.6 if it matters.
> 
> Thanks,
> Ben
> 
>> On Feb 10, 2016, at 2:34 AM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>> 
>> Cheers
>> 
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> + Spark-Dev
>> 
>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>> executor, does localization and brings all hbase-client jars into executor 
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>> client jars, when i checked launch container.sh , Classpath does not have 
>> $PWD/* and hence all the hbase client jars are ignored.
>> 
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>> 
>> Thanks,
>> Prabhu Joseph 
>> 
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> Hi All,
>> 
>>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
>> mode, the job fails at count().
>> 
>> MASTER=yarn-client ./spark-shell
>> 
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>> TableName}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>  
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>> 
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>> hBaseRDD.count()
>> 
>> 
>> Tasks throw below exception, the actual exception is swallowed, a bug 
>> JDK-7172206. After installing hbase client on all NodeManager machines, the 
>> Spark job ran fine. So I confirmed that the issue is with executor classpath.
>> 
>> But i am searching for some other way of including hbase jars in spark 
>> executor classpath instead of installing hbase client on all NM machines. 
>> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that it 
>> localized all hbase jars, still the job fails. Tried 
>> spark.executor.extraClasspath, still the job fails.
>> 
>> Is there any way we can access hbase from Executor without installing 
>> hbase-client on all machines.
>> 
>> 
>> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
>> prabhuFS1): java.lang.IllegalStateException: unread block data
>> at 
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectI

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Hi Ted,

I see that you’re working on the hbase-spark module for hbase. I recently 
packaged the SparkOnHBase project and gave it a test run. It works like a charm 
on CDH 5.4 and 5.5. All I had to do was add 
/opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
/path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
HBaseContext for HBase operations. Now, I want to use the latest in Dataframes. 
Since the new functionality is only in the hbase-spark module, I want to know 
how to get it and package it for CDH 5.5, which still uses HBase 1.0.0. Can you 
tell me what version of hbase master is still backwards compatible?

By the way, we are using Spark 1.6 if it matters.

Thanks,
Ben

> On Feb 10, 2016, at 2:34 AM, Ted Yu  wrote:
> 
> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
> 
> Cheers
> 
> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph  > wrote:
> + Spark-Dev
> 
> For a Spark job on YARN accessing hbase table, added all hbase client jars 
> into spark.yarn.dist.files, NodeManager when launching container i.e 
> executor, does localization and brings all hbase-client jars into executor 
> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
> client jars, when i checked launch container.sh , Classpath does not have 
> $PWD/* and hence all the hbase client jars are ignored.
> 
> Is spark.yarn.dist.files not for adding jars into the executor classpath.
> 
> Thanks,
> Prabhu Joseph 
> 
> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph  > wrote:
> Hi All,
> 
>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
> mode, the job fails at count().
> 
> MASTER=yarn-client ./spark-shell
> 
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
> TableName}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>  
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE,"spark")
> 
> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
> hBaseRDD.count()
> 
> 
> Tasks throw below exception, the actual exception is swallowed, a bug 
> JDK-7172206. After installing hbase client on all NodeManager machines, the 
> Spark job ran fine. So I confirmed that the issue is with executor classpath.
> 
> But i am searching for some other way of including hbase jars in spark 
> executor classpath instead of installing hbase client on all NM machines. 
> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that it 
> localized all hbase jars, still the job fails. Tried 
> spark.executor.extraClasspath, still the job fails.
> 
> Is there any way we can access hbase from Executor without installing 
> hbase-client on all machines.
> 
> 
> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
> prabhuFS1): java.lang.IllegalStateException: unread block data
> at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> Thanks,
> Prabhu Joseph
> 
> 



Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

Is there anything in the works or are there tasks already to do the 
back-porting?

Just curious.

Thanks,
Ben

> On Mar 13, 2016, at 3:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> class HFileWriterImpl (in standalone file) is only present in master branch.
> It is not in branch-1.
> 
> compressionByName() resides in class with @InterfaceAudience.Private which 
> got moved in master branch.
> 
> So looks like there is some work to be done for backporting to branch-1 :-)
> 
> On Sun, Mar 13, 2016 at 1:35 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Ted,
> 
> I did as you said, but it looks like that HBaseContext relies on some 
> differences in HBase itself.
> 
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
>  error: object HFileWriterImpl is not a member of package 
> org.apache.hadoop.hbase.io.hfile
> [ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
> HFileContextBuilder, HFileWriterImpl}
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
>  error: not found: value HFileWriterImpl
> [ERROR] val hfileCompression = HFileWriterImpl
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
>  error: not found: value HFileWriterImpl
> [ERROR] val defaultCompression = HFileWriterImpl
> [ERROR]  ^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
>  error: value COMPARATOR is not a member of object 
> org.apache.hadoop.hbase.CellComparator
> [ERROR] 
> .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
> 
> So… back to my original question… do you know when these incompatibilities 
> were introduced? If so, I can pulled that version at time and try again.
> 
> Thanks,
> Ben
> 
>> On Mar 13, 2016, at 12:42 PM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Benjamin:
>> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
>> subtree into hbase 1.0 root dir and add the following to root pom.xml:
>>     hbase-spark
>> 
>> Then you would be able to build the module yourself.
>> 
>> hbase-spark module uses APIs which are compatible with hbase 1.0
>> 
>> Cheers
>> 
>> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Hi Ted,
>> 
>> I see that you’re working on the hbase-spark module for hbase. I recently 
>> packaged the SparkOnHBase project and gave it a test run. It works like a 
>> charm on CDH 5.4 and 5.5. All I had to do was add 
>> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
>> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
>> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the 
>> easy-to-use HBaseContext for HBase operations. Now, I want to use the latest 
>> in Dataframes. Since the new functionality is only in the hbase-spark 
>> module, I want to know how to get it and package it for CDH 5.5, which still 
>> uses HBase 1.0.0. Can you tell me what version of hbase master is still 
>> backwards compatible?
>> 
>> By the way, we are using Spark 1.6 if it matters.
>> 
>> Thanks,
>> Ben
>> 
>>> On Feb 10, 2016, at 2:34 AM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>>> 
>>> Cheers
>>> 
>>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>>> + Spark-Dev
>>> 
>>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>>> executor, does localization and brings all hbase-client jars into executor 
>>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>>> client jars, when i checked launch container.sh , Classpath does not have 
>>> $PWD/* and hence all the hbase client jars are ignored.
>>> 
>>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>> 
>>> T

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

I did as you said, but it looks like that HBaseContext relies on some 
differences in HBase itself.

[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
 error: object HFileWriterImpl is not a member of package 
org.apache.hadoop.hbase.io.hfile
[ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
HFileContextBuilder, HFileWriterImpl}
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
 error: not found: value HFileWriterImpl
[ERROR] val hfileCompression = HFileWriterImpl
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
 error: not found: value HFileWriterImpl
[ERROR] val defaultCompression = HFileWriterImpl
[ERROR]  ^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
 error: value COMPARATOR is not a member of object 
org.apache.hadoop.hbase.CellComparator
[ERROR] 
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)

So… back to my original question… do you know when these incompatibilities were 
introduced? If so, I can pulled that version at time and try again.

Thanks,
Ben

> On Mar 13, 2016, at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
> 
> Then you would be able to build the module yourself.
> 
> hbase-spark module uses APIs which are compatible with hbase 1.0
> 
> Cheers
> 
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Ted,
> 
> I see that you’re working on the hbase-spark module for hbase. I recently 
> packaged the SparkOnHBase project and gave it a test run. It works like a 
> charm on CDH 5.4 and 5.5. All I had to do was add 
> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
> HBaseContext for HBase operations. Now, I want to use the latest in 
> Dataframes. Since the new functionality is only in the hbase-spark module, I 
> want to know how to get it and package it for CDH 5.5, which still uses HBase 
> 1.0.0. Can you tell me what version of hbase master is still backwards 
> compatible?
> 
> By the way, we are using Spark 1.6 if it matters.
> 
> Thanks,
> Ben
> 
>> On Feb 10, 2016, at 2:34 AM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>> 
>> Cheers
>> 
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> + Spark-Dev
>> 
>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>> executor, does localization and brings all hbase-client jars into executor 
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>> client jars, when i checked launch container.sh , Classpath does not have 
>> $PWD/* and hence all the hbase client jars are ignored.
>> 
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>> 
>> Thanks,
>> Prabhu Joseph 
>> 
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> Hi All,
>> 
>>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
>> mode, the job fails at count().
>> 
>> MASTER=yarn-client ./spark-shell
>> 
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>> TableName}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>  
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>> 
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>> hBaseRDD.count()
>> 
>> 
>> Tasks throw below exception, the actual exception is swallowed, a bug 
>> JDK

S3 Zip File Loading Advice

2016-03-08 Thread Benjamin Kim
I am wondering if anyone can help.

Our company stores zipped CSV files in S3, which has been a big headache from 
the start. I was wondering if anyone has created a way to iterate through 
several subdirectories (s3n://events/2016/03/01/00, s3n://2016/03/01/01, etc.) 
in S3 to find the newest files and load them. It would be a big bonus to 
include the unzipping of the file in the process so that the CSV can be loaded 
directly into a dataframe for further processing. I’m pretty sure that the S3 
part of this request is not uncommon. I would think the file being zipped is 
uncommon. If anyone can help, I would truly be grateful for I am new to Scala 
and Spark. This would be a great help in learning.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3 Zip File Loading Advice

2016-03-09 Thread Benjamin Kim
Hi Sabarish,

I found a similar posting online where I should use the S3 listKeys. 
http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd.
 Is this what you were thinking?

And, your assumption is correct. The zipped CSV file contains only a single 
file. I found this posting. 
http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark. I see 
how to do the unzipping, but I cannot get it to work when running the code 
directly.

...
import java.io.{ IOException, FileOutputStream, FileInputStream, File }
import java.util.zip.{ ZipEntry, ZipInputStream }
import org.apache.spark.input.PortableDataStream

sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)

val zipFile = 
"s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip"
val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, content: 
PortableDataStream) => new ZipInputStream(content.open) }

:95: error: type mismatch;
 found   : java.util.zip.ZipInputStream
 required: TraversableOnce[?]
 val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name, 
content) => new ZipInputStream(content.open) }

^

Thanks,
Ben

> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan <sabarish@gmail.com> 
> wrote:
> 
> You can use S3's listKeys API and do a diff between consecutive listKeys to 
> identify what's new.
> 
> Are there multiple files in each zip? Single file archives are processed just 
> like text as long as it is one of the supported compression formats.
> 
> Regards
> Sab
> 
> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I am wondering if anyone can help.
> 
> Our company stores zipped CSV files in S3, which has been a big headache from 
> the start. I was wondering if anyone has created a way to iterate through 
> several subdirectories (s3n://events/2016/03/01/00, s3n://2016/03/01/01, 
> etc.) in S3 to find the newest files and load them. It would be a big bonus 
> to include the unzipping of the file in the process so that the CSV can be 
> loaded directly into a dataframe for further processing. I’m pretty sure that 
> the S3 part of this request is not uncommon. I would think the file being 
> zipped is uncommon. If anyone can help, I would truly be grateful for I am 
> new to Scala and Spark. This would be a great help in learning.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

You are correct. I am talking about the Databricks package spark-csv you have 
below.

The files are stored in s3 and I download, unzip, and store each one of them in 
a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).

Here is some of the code.

val filesRdd = sc.parallelize(lFiles, 250)
filesRdd.foreachPartition(files => {
  val s3Client = new AmazonS3Client(new 
EnvironmentVariableCredentialsProvider())
  files.foreach(file => {
val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
val zipFile = new ZipInputStream(s3Object.getObjectContent())
val csvFile = readZipStream(zipFile)
  })
})

This function does the unzipping and converts to string.

def readZipStream(stream: ZipInputStream): String = {
  stream.getNextEntry
  var stuff = new ListBuffer[String]()
  val scanner = new Scanner(stream)
  while(scanner.hasNextLine){
stuff += scanner.nextLine
  }
  stuff.toList.mkString("\n")
}

The next step is to parse the CSV string and convert to a dataframe, which will 
populate a Hive/HBase table.

If you can help, I would be truly grateful.

Thanks,
Ben


> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> just to clarify are you talking about databricks csv package.
> 
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
> 
> Where are these zipped files? Are they copied to a staging directory in hdfs?
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I have a quick question. I have downloaded multiple zipped files from S3 and 
> unzipped each one of them into strings. The next step is to parse using a CSV 
> parser. I want to know if there is a way to easily use the spark csv package 
> for this?
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

I forgot to mention that - this is the ugly part - the source data provider 
gives us (Windows) pkzip compressed files. Will spark uncompress these 
automatically? I haven’t been able to make it work.

Thanks,
Ben

> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> Hi Ben,
> 
> Well I have done it for standard csv files downloaded from spreadsheets to 
> staging directory on hdfs and loaded from there.
> 
> First you may not need to unzip them. dartabricks can read them (in my case) 
> and zipped files.
> 
> Check this. Mine is slightly different from what you have, First I zip my csv 
> files with bzip2 and load them into hdfs
> 
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""===  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging 
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging directory 
> ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging 
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> 
> Now you have all your csv files in the staging directory
> 
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> 
> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", 
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String, 
> Description: String, Value: Double, Balance: Double, AccountName: String, 
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p => 
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> 
> // Need to create and populate target ORC table nw_10124772 in database 
> accounts.in <http://accounts.in/> Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>   
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
>  AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
>     , AccountNumber
> FROM tmp
> """
> sql(sqltext)
> 
> println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore
> 
> Once you store into a some form of table (Parquet, ORC) etc you can do 
> whatever you like with it.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Mich,
> 
> You are correct. I am talking about the Databricks package spark-csv you ha

Re: Monitoring S3 Bucket with Spark Streaming

2016-04-12 Thread Benjamin Kim
All,

I have more of a general Scala JSON question.

I have setup a notification on the S3 source bucket that triggers a Lambda 
function to unzip the new file placed there. Then, it saves the unzipped CSV 
file into another destination bucket where a notification is sent to a SQS 
topic. The contents of the message body is in JSON having the top level be the 
“Records” collection where within are 1 or more “Record” objects. I would like 
to know how to iterate through the “Records” retrieving each “Record” to 
extract the bucket value and the key value. I would then use this information 
to download the file into a DataFrame via spark-csv. Does anyone have any 
experience doing this?

I wrote some quick stab at it, but I know it’s not right.

def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(30))   // new context
val records = ssc.receiverStream(new SQSReceiver("amg-events")
.credentials(accessKey, secretKey)
.at(Regions.US_EAST_1)
.withTimeout(2))

records.foreach(record => {
val bucket = record['s3']['bucket']['name']
val key = record['s3']['object']['key']
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("s3://" + bucket + "/" + key)
//save to hbase
})

ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
ssc
}

Thanks,
Ben

> On Apr 9, 2016, at 6:12 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> Ah, I spoke too soon.
> 
> I thought the SQS part was going to be a spark package. It looks like it has 
> be compiled into a jar for use. Am I right? Can someone help with this? I 
> tried to compile it using SBT, but I’m stuck with a SonatypeKeys not found 
> error.
> 
> If there’s an easier alternative, please let me know.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> This was easy!
>> 
>> I just created a notification on a source S3 bucket to kick off a Lambda 
>> function that would decompress the dropped file and save it to another S3 
>> bucket. In return, this S3 bucket has a notification to send a SNS message 
>> to me via email. I can just as easily setup SQS to be the endpoint of this 
>> notification. This would then convey to a listening Spark Streaming job the 
>> file information to download. I like this!
>> 
>> Cheers,
>> Ben 
>> 
>>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>> This is awesome! I have someplace to start from.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>>>> <mailto:programminggee...@gmail.com> wrote:
>>>> 
>>>> Someone please correct me if I am wrong as I am still rather green to 
>>>> spark, however it appears that through the S3 notification mechanism 
>>>> described below, you can publish events to SQS and use SQS as a streaming 
>>>> source into spark. The project at 
>>>> https://github.com/imapi/spark-sqs-receiver 
>>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>>>> for doing this.
>>>> 
>>>> Hope this helps.
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> 
>>>>> Nezih,
>>>>> 
>>>>> This looks like a good alternative to having the Spark Streaming job 
>>>>> check for new files on its own. Do you know if there is a way to have the 
>>>>> Spark Streaming job get notified with the new file information and act 
>>>>> upon it? This can reduce the overhead and cost of polling S3. Plus, I can 
>>>>> use this to notify and kick off Lambda to process new data files and make 
>>>>> them ready for Spark Streaming to consume. This will also use 
>>>>> notifications to trigger. I just need to have all incoming folders 
>>>>> configured for notifications for Lambda and all outgoing folders for 
>>>>> Spark Streaming. This sounds like a better set

Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
This is awesome! I have someplace to start from.

Thanks,
Ben


> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com wrote:
> 
> Someone please correct me if I am wrong as I am still rather green to spark, 
> however it appears that through the S3 notification mechanism described 
> below, you can publish events to SQS and use SQS as a streaming source into 
> spark. The project at https://github.com/imapi/spark-sqs-receiver 
> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
> for doing this.
> 
> Hope this helps.
> 
> Sent from my iPhone
> 
> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Nezih,
>> 
>> This looks like a good alternative to having the Spark Streaming job check 
>> for new files on its own. Do you know if there is a way to have the Spark 
>> Streaming job get notified with the new file information and act upon it? 
>> This can reduce the overhead and cost of polling S3. Plus, I can use this to 
>> notify and kick off Lambda to process new data files and make them ready for 
>> Spark Streaming to consume. This will also use notifications to trigger. I 
>> just need to have all incoming folders configured for notifications for 
>> Lambda and all outgoing folders for Spark Streaming. This sounds like a 
>> better setup than we have now.
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>> <mailto:nyigitb...@netflix.com>> wrote:
>>> 
>>> While it is doable in Spark, S3 also supports notifications: 
>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>> 
>>> 
>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>> <mailto:nlaucha...@gmail.com>> wrote:
>>> Hi Benjamin,
>>> 
>>> I have done it . The critical configuration items are the ones below :
>>> 
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>> AccessKeyId)
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>> AWSSecretAccessKey)
>>> 
>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>> ")
>>> 
>>> This code will probe for new S3 files created in your every batch interval.
>>> 
>>> Thanks,
>>> Natu
>>> 
>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>> pulled any new files to process? If so, can you provide basic Scala coding 
>>> help on this?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Natu,

Do you know if textFileStream can see if new files are created underneath a 
whole bucket? For example, if the bucket name is incoming and new files 
underneath it are 2016/04/09/00/00/01/data.csv and 
2016/04/09/00/00/02/data/csv, will these files be picked up? Also, will Spark 
Streaming not pick up these files again on the following run knowing that it 
already picked them up or do we have to store state somewhere, like the last 
run date and time to compare against?

Thanks,
Ben

> On Apr 8, 2016, at 9:15 PM, Natu Lauchande <nlaucha...@gmail.com> wrote:
> 
> Hi Benjamin,
> 
> I have done it . The critical configuration items are the ones below :
> 
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
> AccessKeyId)
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
> AWSSecretAccessKey)
> 
>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")
> 
> This code will probe for new S3 files created in your every batch interval.
> 
> Thanks,
> Natu
> 
> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
> pulled any new files to process? If so, can you provide basic Scala coding 
> help on this?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Nezih,

This looks like a good alternative to having the Spark Streaming job check for 
new files on its own. Do you know if there is a way to have the Spark Streaming 
job get notified with the new file information and act upon it? This can reduce 
the overhead and cost of polling S3. Plus, I can use this to notify and kick 
off Lambda to process new data files and make them ready for Spark Streaming to 
consume. This will also use notifications to trigger. I just need to have all 
incoming folders configured for notifications for Lambda and all outgoing 
folders for Spark Streaming. This sounds like a better setup than we have now.

Thanks,
Ben

> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com> wrote:
> 
> While it is doable in Spark, S3 also supports notifications: 
> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
> 
> 
> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
> <mailto:nlaucha...@gmail.com>> wrote:
> Hi Benjamin,
> 
> I have done it . The critical configuration items are the ones below :
> 
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
> AccessKeyId)
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
> AWSSecretAccessKey)
> 
>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")
> 
> This code will probe for new S3 files created in your every batch interval.
> 
> Thanks,
> Natu
> 
> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
> pulled any new files to process? If so, can you provide basic Scala coding 
> help on this?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Ah, I spoke too soon.

I thought the SQS part was going to be a spark package. It looks like it has be 
compiled into a jar for use. Am I right? Can someone help with this? I tried to 
compile it using SBT, but I’m stuck with a SonatypeKeys not found error.

If there’s an easier alternative, please let me know.

Thanks,
Ben


> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> This was easy!
> 
> I just created a notification on a source S3 bucket to kick off a Lambda 
> function that would decompress the dropped file and save it to another S3 
> bucket. In return, this S3 bucket has a notification to send a SNS message to 
> me via email. I can just as easily setup SQS to be the endpoint of this 
> notification. This would then convey to a listening Spark Streaming job the 
> file information to download. I like this!
> 
> Cheers,
> Ben 
> 
>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> This is awesome! I have someplace to start from.
>> 
>> Thanks,
>> Ben
>> 
>> 
>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>>> <mailto:programminggee...@gmail.com> wrote:
>>> 
>>> Someone please correct me if I am wrong as I am still rather green to 
>>> spark, however it appears that through the S3 notification mechanism 
>>> described below, you can publish events to SQS and use SQS as a streaming 
>>> source into spark. The project at 
>>> https://github.com/imapi/spark-sqs-receiver 
>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>>> for doing this.
>>> 
>>> Hope this helps.
>>> 
>>> Sent from my iPhone
>>> 
>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>>> Nezih,
>>>> 
>>>> This looks like a good alternative to having the Spark Streaming job check 
>>>> for new files on its own. Do you know if there is a way to have the Spark 
>>>> Streaming job get notified with the new file information and act upon it? 
>>>> This can reduce the overhead and cost of polling S3. Plus, I can use this 
>>>> to notify and kick off Lambda to process new data files and make them 
>>>> ready for Spark Streaming to consume. This will also use notifications to 
>>>> trigger. I just need to have all incoming folders configured for 
>>>> notifications for Lambda and all outgoing folders for Spark Streaming. 
>>>> This sounds like a better setup than we have now.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>>>> <mailto:nyigitb...@netflix.com>> wrote:
>>>>> 
>>>>> While it is doable in Spark, S3 also supports notifications: 
>>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>>>> 
>>>>> 
>>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>>>> <mailto:nlaucha...@gmail.com>> wrote:
>>>>> Hi Benjamin,
>>>>> 
>>>>> I have done it . The critical configuration items are the ones below :
>>>>> 
>>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>>>> AccessKeyId)
>>>>>   
>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>>>> AWSSecretAccessKey)
>>>>> 
>>>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>>>> ")
>>>>> 
>>>>> This code will probe for new S3 files created in your every batch 
>>>>> interval.
>>>>> 
>>>>> Thanks,
>>>>> Natu
>>>>> 
>>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>>>> pulled any new files to process? If so, can you provide basic Scala 
>>>>> coding help on this?
>>>>> 
>>>>> Thanks,
>>>>> Ben
>>>>> 
>>>>> 
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>>> <mailto:user-h...@spark.apache.org>
>>>>> 
>>>>> 
>>>> 
>> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
This was easy!

I just created a notification on a source S3 bucket to kick off a Lambda 
function that would decompress the dropped file and save it to another S3 
bucket. In return, this S3 bucket has a notification to send a SNS message to 
me via email. I can just as easily setup SQS to be the endpoint of this 
notification. This would then convey to a listening Spark Streaming job the 
file information to download. I like this!

Cheers,
Ben 

> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> This is awesome! I have someplace to start from.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>> <mailto:programminggee...@gmail.com> wrote:
>> 
>> Someone please correct me if I am wrong as I am still rather green to spark, 
>> however it appears that through the S3 notification mechanism described 
>> below, you can publish events to SQS and use SQS as a streaming source into 
>> spark. The project at https://github.com/imapi/spark-sqs-receiver 
>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>> for doing this.
>> 
>> Hope this helps.
>> 
>> Sent from my iPhone
>> 
>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>>> Nezih,
>>> 
>>> This looks like a good alternative to having the Spark Streaming job check 
>>> for new files on its own. Do you know if there is a way to have the Spark 
>>> Streaming job get notified with the new file information and act upon it? 
>>> This can reduce the overhead and cost of polling S3. Plus, I can use this 
>>> to notify and kick off Lambda to process new data files and make them ready 
>>> for Spark Streaming to consume. This will also use notifications to 
>>> trigger. I just need to have all incoming folders configured for 
>>> notifications for Lambda and all outgoing folders for Spark Streaming. This 
>>> sounds like a better setup than we have now.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>>> <mailto:nyigitb...@netflix.com>> wrote:
>>>> 
>>>> While it is doable in Spark, S3 also supports notifications: 
>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>>> 
>>>> 
>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>>> <mailto:nlaucha...@gmail.com>> wrote:
>>>> Hi Benjamin,
>>>> 
>>>> I have done it . The critical configuration items are the ones below :
>>>> 
>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>>> AccessKeyId)
>>>>   
>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>>> AWSSecretAccessKey)
>>>> 
>>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>>> ")
>>>> 
>>>> This code will probe for new S3 files created in your every batch interval.
>>>> 
>>>> Thanks,
>>>> Natu
>>>> 
>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>>> pulled any new files to process? If so, can you provide basic Scala coding 
>>>> help on this?
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>> <mailto:user-h...@spark.apache.org>
>>>> 
>>>> 
>>> 
> 



Monitoring S3 Bucket with Spark Streaming

2016-04-08 Thread Benjamin Kim
Has anyone monitored an S3 bucket or directory using Spark Streaming and pulled 
any new files to process? If so, can you provide basic Scala coding help on 
this?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



BinaryFiles to ZipInputStream

2016-03-23 Thread Benjamin Kim
I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.

First of all, I am able to get the List of file keys that have a modified date 
within a range of time by using the AWS SDK Objects (AmazonS3Client, 
ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by 
setting up the HadoopConfiguration object with s3 access and secret keys, I 
parallelize, partition, and iterate through the List to load each file’s 
contents into a RDD[(String, org.apache.spark.input.PortableDataStream)].

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)

val filesRdd = sc.parallelize(lFiles)
filesRdd.foreachPartition(files => {
  val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x))
  val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all 
zip input streams
  val lStrContent = lZipStream.map(x => readZipStream(x))  // read contents 
into string 

})

This is where I need help. I get this error.

:196: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)]
 required: java.io.InputStream
val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // 
make them all zip input streams

   ^

Does anyone know how to load the PortableDataStream returned in a RDD and 
convert it into a ZipInputStream?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SFTP Compressed CSV into Dataframe

2016-03-02 Thread Benjamin Kim
I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV file? 
I am able to download the file first locally using the SFTP Client in the 
spark-sftp package. Then, I load the file into a dataframe using the spark-csv 
package, which automatically decompresses the file. I just want to remove the 
"downloading file to local" step and directly have the remote file 
decompressed, read, and loaded. Can someone give me any hints?

Thanks,
Ben



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building a REST Service with Spark back-end

2016-03-02 Thread Benjamin Kim
I want to ask about something related to this.

Does anyone know if there is or will be a command line equivalent of 
spark-shell client for Livy Spark Server or any other Spark Job Server? The 
reason that I am asking spark-shell does not handle multiple users on the same 
server well. Since a Spark Job Server can generate "sessions" for each user, it 
would be great if this were possible.

Another person in the Livy users group pointed out some advantages.

I think the use case makes complete sense for a number of reasons:
1. You wouldn't need an installation of spark and configs on the gateway machine
2. Since Livy is over HTTP, it'd be easier to run spark-shell in front of a 
firewall
3. Can "connect/disconnect" to sessions similar to screen in linux

Thanks,
Ben

> On Mar 2, 2016, at 1:11 PM, Guru Medasani  wrote:
> 
> Hi Yanlin,
> 
> This is a fairly new effort and is not officially released/supported by 
> Cloudera yet. I believe those numbers will be out once it is released.
> 
> Guru Medasani
> gdm...@gmail.com 
> 
> 
> 
>> On Mar 2, 2016, at 10:40 AM, yanlin wang > > wrote:
>> 
>> Did any one use Livy in real world high concurrency web app? I think it uses 
>> spark submit command line to create job... How about  job server or notebook 
>> comparing with Livy?
>> 
>> Thx,
>> Yanlin
>> 
>> Sent from my iPhone
>> 
>> On Mar 2, 2016, at 6:24 AM, Guru Medasani > > wrote:
>> 
>>> Hi Don,
>>> 
>>> Here is another REST interface for interacting with Spark from anywhere. 
>>> 
>>> https://github.com/cloudera/livy 
>>> 
>>> Here is an example to estimate PI using Spark from Python using requests 
>>> library. 
>>> 
>>> >>> data = {
>>> ...   'code': textwrap.dedent("""\
>>> ...  val NUM_SAMPLES = 10;
>>> ...  val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
>>> ...val x = Math.random();
>>> ...val y = Math.random();
>>> ...if (x*x + y*y < 1) 1 else 0
>>> ...  }.reduce(_ + _);
>>> ...  println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
>>> ...  """)
>>> ... }
>>> >>> r = requests.post(statements_url, data=json.dumps(data), 
>>> >>> headers=headers)
>>> >>> pprint.pprint(r.json())
>>> {u'id': 1,
>>>  u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: 
>>> Int = 10\ncount: Int = 78501'},
>>>  u'execution_count': 1,
>>>  u'status': u'ok'},
>>>  u'state': u'available'}
>>> 
>>> 
>>> Guru Medasani
>>> gdm...@gmail.com 
>>> 
>>> 
>>> 
 On Mar 2, 2016, at 7:47 AM, Todd Nist > wrote:
 
 Have you looked at Apache Toree, http://toree.apache.org/ 
 .  This was formerly the Spark-Kernel from IBM 
 but contributed to apache.
 
 https://github.com/apache/incubator-toree 
 
 
 You can find a good overview on the spark-kernel here:
 http://www.spark.tc/how-to-enable-interactive-applications-against-apache-spark/
  
 
 
 Not sure if that is of value to you or not.
 
 HTH.
 
 -Todd
 
 On Tue, Mar 1, 2016 at 7:30 PM, Don Drake > wrote:
 I'm interested in building a REST service that utilizes a Spark SQL 
 Context to return records from a DataFrame (or IndexedRDD?) and even 
 add/update records.
 
 This will be a simple REST API, with only a few end-points.  I found this 
 example:
 
 https://github.com/alexmasselot/spark-play-activator 
 
 
 which looks close to what I am interested in doing.  
 
 Are there any other ideas or options if I want to run this in a YARN 
 cluster?
 
 Thanks.
 
 -Don
 
 -- 
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/ 
 https://twitter.com/dondrake 
 800-733-2143 
>>> 
> 



Re: SFTP Compressed CSV into Dataframe

2016-03-03 Thread Benjamin Kim
Sumedh,

How would this work? The only server that we have is the Oozie server with no 
resources to run anything except Oozie, and we have no sudo permissions. If we 
run the mount command using the shell action which can run on any node of the 
cluster via YARN, then the spark job will not be able to see it because it 
could exist on any random unknown node. If we run the mount command using shell 
commands in spark, then could be possible that the mount will exist on the same 
node as the executor reading the file?

Thanks,
Ben 

> On Mar 3, 2016, at 10:29 AM, Sumedh Wale <sw...@snappydata.io> wrote:
> 
> (-user)
> 
> On Thursday 03 March 2016 10:09 PM, Benjamin Kim wrote:
>> I forgot to mention that we will be scheduling this job using Oozie. So, we 
>> will not be able to know which worker node is going to being running this. 
>> If we try to do anything local, it would get lost. This is why I’m looking 
>> for something that does not deal with the local file system.
> 
> Can't you mount using sshfs locally as part of the job at the start, then 
> unmount at the end? This is assuming that the platform being used is Linux.
> 
>>> On Mar 2, 2016, at 11:17 AM, Benjamin Kim <bbuil...@gmail.com> wrote:
>>> 
>>> I wonder if anyone has opened a SFTP connection to open a remote GZIP CSV 
>>> file? I am able to download the file first locally using the SFTP Client in 
>>> the spark-sftp package. Then, I load the file into a dataframe using the 
>>> spark-csv package, which automatically decompresses the file. I just want 
>>> to remove the "downloading file to local" step and directly have the remote 
>>> file decompressed, read, and loaded. Can someone give me any hints?
>>> 
>>> Thanks,
>>> Ben
> 
> thanks
> 
> -- 
> Sumedh Wale
> SnappyData (http://www.snappydata.io)
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Benjamin Kim
To comment…

At my company, we have not gotten it to work in any other mode than local. If 
we try any of the yarn modes, it fails with a “file does not exist” error when 
trying to locate the executable jar. I mentioned this to the Hue users group, 
which we used for this, and they replied that the Spark Action is very basic 
implementation and that they will be writing their own for production use.

That’s all I know...

> On Mar 7, 2016, at 1:18 AM, Deepak Sharma  wrote:
> 
> There is Spark action defined for oozie workflows.
> Though I am not sure if it supports only Java SPARK jobs or Scala jobs as 
> well.
> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
> 
> Thanks
> Deepak
> 
> On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot  > wrote:
> Hi,
> 
> Could somebody help me by providing the steps /redirect me  to 
> blog/documentation on how to run Spark job written in scala through Oozie.
> 
> Would really appreciate the help.
> 
> 
> 
> Thanks,
> Divya 
> 
> 
> 
> -- 
> Thanks
> Deepak
> www.bigdatabig.com 
> www.keosha.net 


can spark-csv package accept strings instead of files?

2016-04-01 Thread Benjamin Kim
Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
data strings. Each string representing the header row and multiple rows of data 
along with delimiters. I would like to feed each thru a CSV parser to convert 
the data into a dataframe and, ultimately, UPSERT a Hive/HBase table with this 
data.

Please let me know if you have any ideas.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
I have a quick question. I have downloaded multiple zipped files from S3 and 
unzipped each one of them into strings. The next step is to parse using a CSV 
parser. I want to know if there is a way to easily use the spark csv package 
for this?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: new object store driver for Spark

2016-03-22 Thread Benjamin Kim
Hi Gil,

Currently, our company uses S3 heavily for data storage. Can you further 
explain the benefits of this in relation to S3 when the pending patch does come 
out? Also, I have heard of Swift from others. Can you explain to me the pros 
and cons of Swift compared to HDFS? It can be just a brief summary if you like 
or just guide me to material that will help me get a better understanding.

Thanks,
Ben

> On Mar 22, 2016, at 6:35 AM, Gil Vernik  wrote:
> 
> We recently released an object store connector for Spark. 
> https://github.com/SparkTC/stocator 
> Currently this connector contains driver for the Swift based object store ( 
> like SoftLayer or any other Swift cluster ), but it can easily support 
> additional object stores.
> There is a pending patch to support Amazon S3 object store. 
> 
> The major highlight is that this connector doesn't create any temporary files 
>  and so it achieves very fast response times when Spark persist data in the 
> object store.
> The new connector supports speculate mode and covers various failure 
> scenarios ( like two Spark tasks writing into same object, partial corrupted 
> data due to run time exceptions in Spark master, etc ).  It also covers 
> https://issues.apache.org/jira/browse/SPARK-10063 
> and other known issues.
> 
> The detail algorithm for fault tolerance will be released very soon. For now, 
> those who interested, can view the implementation in the code itself.
> 
>  https://github.com/SparkTC/stocator 
> contains all the details how to setup 
> and use with Spark.
> 
> A series of tests showed that the new connector obtains 70% improvements for 
> write operations from Spark to Swift and about 30% improvements for read 
> operations from Swift into Spark ( comparing to the existing driver that 
> Spark uses to integrate with objects stored in Swift). 
> 
> There is an ongoing work to add more coverage and fix some known bugs / 
> limitations.
> 
> All the best
> Gil
> 



Re: Save DataFrame to HBase

2016-04-24 Thread Benjamin Kim
Hi Daniel,

How did you get the Phoenix plugin to work? I have CDH 5.5.2 installed which 
comes with HBase 1.0.0 and Phoenix 4.5.2. Do you think this will work?

Thanks,
Ben

> On Apr 24, 2016, at 1:43 AM, Daniel Haviv <daniel.ha...@veracity-group.com> 
> wrote:
> 
> Hi,
> I tried saving DF to HBase using a hive table with hbase storage handler and 
> hiveContext but it failed due to a bug.
> 
> I was able to persist the DF to hbase using Apache Pheonix which was pretty 
> simple.
> 
> Thank you.
> Daniel
> 
> On 21 Apr 2016, at 16:52, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Has anyone found an easy way to save a DataFrame into HBase?
>> 
>> Thanks,
>> Ben
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 



Spark 2.0+ Structured Streaming

2016-04-28 Thread Benjamin Kim
Can someone explain to me how the new Structured Streaming works in the 
upcoming Spark 2.0+? I’m a little hazy how data will be stored and referenced 
if it can be queried and/or batch processed directly from streams and if the 
data will be append only to or will there be some sort of upsert capability 
available. This almost sounds similar to what AWS Kinesis is trying to achieve, 
but it can only store the data for 24 hours. Am I close?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0 Release Date

2016-04-28 Thread Benjamin Kim
Next Thursday is Databricks' webinar on Spark 2.0. If you are attending, I bet 
many are going to ask when the release will be. Last time they did this, Spark 
1.6 came out not too long afterward.

> On Apr 28, 2016, at 5:21 AM, Sean Owen  wrote:
> 
> I don't know if anyone has begun a firm discussion on dates, but there
> are >100 open issues and ~10 blockers, so still some work to do before
> code freeze, it looks like. My unofficial guess is mid June before
> it's all done.
> 
> On Thu, Apr 28, 2016 at 12:43 PM, Arun Patel  wrote:
>> A small request.
>> 
>> Would you mind providing an approximate date of Spark 2.0 release?  Is it
>> early May or Mid May or End of May?
>> 
>> Thanks,
>> Arun
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save DataFrame to HBase

2016-04-27 Thread Benjamin Kim
Daniel,

If you can get the code snippet, that would be great! I’ve been trying to get 
it to work for me as well. The examples on the Phoenix website do not work for 
me. If you are willing to also, can you include your setup to make Phoenix work 
with Spark?

Thanks,
Ben

> On Apr 27, 2016, at 11:46 AM, Paras sachdeva <paras.sachdeva11...@gmail.com> 
> wrote:
> 
> Hi Daniel,
> 
> Would you possibly be able to share the snipped to code you have used ?
> 
> Thank you.
> 
> On Wed, Apr 27, 2016 at 3:13 PM, Daniel Haviv 
> <daniel.ha...@veracity-group.com <mailto:daniel.ha...@veracity-group.com>> 
> wrote:
> Hi Benjamin,
> Yes it should work.
> 
> Let me know if you need further assistance I might be able to get the code 
> I've used for that project.
> 
> Thank you.
> Daniel
> 
> On 24 Apr 2016, at 17:35, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Hi Daniel,
>> 
>> How did you get the Phoenix plugin to work? I have CDH 5.5.2 installed which 
>> comes with HBase 1.0.0 and Phoenix 4.5.2. Do you think this will work?
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 24, 2016, at 1:43 AM, Daniel Haviv <daniel.ha...@veracity-group.com 
>>> <mailto:daniel.ha...@veracity-group.com>> wrote:
>>> 
>>> Hi,
>>> I tried saving DF to HBase using a hive table with hbase storage handler 
>>> and hiveContext but it failed due to a bug.
>>> 
>>> I was able to persist the DF to hbase using Apache Pheonix which was pretty 
>>> simple.
>>> 
>>> Thank you.
>>> Daniel
>>> 
>>> On 21 Apr 2016, at 16:52, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>>> Has anyone found an easy way to save a DataFrame into HBase?
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>> <mailto:user-h...@spark.apache.org>
>>>> 
>> 
> 



Re: Save DataFrame to HBase

2016-04-27 Thread Benjamin Kim
Hi Ted,

Do you know when the release will be? I also see some documentation for usage 
of the hbase-spark module at the hbase website. But, I don’t see an example on 
how to save data. There is only one for reading/querying data. Will this be 
added when the final version does get released?

Thanks,
Ben

> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
> this.
> 
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Convert DataFrame to Array of Arrays

2016-04-24 Thread Benjamin Kim
I have data in a DataFrame loaded from a CSV file. I need to load this data 
into HBase using an RDD formatted in a certain way.

val rdd = sc.parallelize(
Array(key1,
(ColumnFamily, ColumnName1, Value1),
(ColumnFamily, ColumnName2, Value2),
(…),
key2,
(ColumnFamily, ColumnName1, Value1),
(ColumnFamily, ColumnName2, Value2),
(…),
…)
)

Can someone help me to iterate through each column in a row of data to build 
such an Array structure?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming S3 Error

2016-05-20 Thread Benjamin Kim
I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
Spark 1.6.0. It seems not to work. I keep getting this error.

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
stack
Exception Details:
  Location:

org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
 @155: invokevirtual
  Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
assignable to 'org/jets3t/service/model/StorageObject'
  Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
integer }
  Bytecode:
0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x0a0: 000a 4e2a 2d2b b700 c7b1   
  Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
  Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)

at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at 

Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
Ted,

I only see 1 jets3t-0.9.0 jar in the classpath after running this to list the 
jars.

val cl = ClassLoader.getSystemClassLoader
cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)

/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/jets3t-0.9.0.jar

I don’t know what else could be wrong.

Thanks,
Ben

> On May 21, 2016, at 4:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> <http://org.apache.spark.streaming.dstream.fileinputdstream.org/>$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at 
> org.apache.spark.

Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
I got my answer.

The way to access S3 has changed.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)

val lines = ssc.textFileStream("s3a://amg-events-out/")

This worked.

Cheers,
Ben


> On May 21, 2016, at 4:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> <http://org.apache.spark.streaming.dstream.fileinputdstream.org/>$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>

Re: Save DataFrame to HBase

2016-05-10 Thread Benjamin Kim
Ted,

Will the hbase-spark module allow for creating tables in Spark SQL that 
reference the hbase tables underneath? In this way, users can query using just 
SQL.

Thanks,
Ben

> On Apr 28, 2016, at 3:09 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Hbase 2.0 release likely would come after Spark 2.0 release. 
> 
> There're other features being developed in hbase 2.0
> I am not sure when hbase 2.0 would be released. 
> 
> The refguide is incomplete. 
> Zhan has assigned the doc JIRA to himself. The documentation would be done 
> after fixing bugs in hbase-spark module. 
> 
> Cheers
> 
> On Apr 27, 2016, at 10:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Hi Ted,
>> 
>> Do you know when the release will be? I also see some documentation for 
>> usage of the hbase-spark module at the hbase website. But, I don’t see an 
>> example on how to save data. There is only one for reading/querying data. 
>> Will this be added when the final version does get released?
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can 
>>> do this.
>>> 
>>> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Has anyone found an easy way to save a DataFrame into HBase?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 



Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-16 Thread Benjamin Kim
I have a curiosity question. These forever/unlimited DataFrames/DataSets will 
persist and be query capable. I still am foggy about how this data will be 
stored. As far as I know, memory is finite. Will the data be spilled to disk 
and be retrievable if the query spans data not in memory? Is Tachyon (Alluxio), 
HDFS (Parquet), NoSQL (HBase, Cassandra), RDBMS (PostgreSQL, MySQL), Object 
Store (S3, Swift), or any else I can’t think of going to be the underlying near 
real-time storage system?

Thanks,
Ben

> On May 15, 2016, at 3:36 PM, Yuval Itzchakov  wrote:
> 
> Hi Ofir,
> Thanks for the elaborated answer. I have read both documents, where they do a 
> light touch on infinite Dataframes/Datasets. However, they do not go in depth 
> as regards to how existing transformations on DStreams, for example, will be 
> transformed into the Dataset APIs. I've been browsing the 2.0 branch and have 
> yet been able to understand how they correlate.
> 
> Also, placing SparkSession in the sql package seems like a peculiar choice, 
> since this is going to be the global abstraction over 
> SparkContext/StreamingContext from now on.
> 
> On Sun, May 15, 2016, 23:42 Ofir Manor  > wrote:
> Hi Yuval,
> let me share my understanding based on similar questions I had.
> First, Spark 2.x aims to replace a whole bunch of its APIs with just two main 
> ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset (merging 
> of Dataset and Dataframe - which is why it inherits all the SparkSQL 
> goodness), while RDD seems as a low-level API only for special cases. The new 
> Dataset should also support both batch and streaming - replacing (eventually) 
> DStream as well. See the design docs in SPARK-13485 (unified API) and 
> SPARK-8360 (StructuredStreaming) for a good intro. 
> However, as you noted, not all will be fully delivered in 2.0. For example, 
> it seems that streaming from / to Kafka using StructuredStreaming didn't make 
> it (so far?) to 2.0 (which is a showstopper for me). 
> Anyway, as far as I understand, you should be able to apply stateful 
> operators (non-RDD) on Datasets (for example, the new event-time window 
> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
> sinks migrated to the new (richer) API and semantics.
> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
> will align with the current offering...
> 
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286  | Email: 
> ofir.ma...@equalum.io 
> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov  > wrote:
> I've been reading/watching videos about the upcoming Spark 2.0 release which
> brings us Structured Streaming. One thing I've yet to understand is how this
> relates to the current state of working with Streaming in Spark with the
> DStream abstraction.
> 
> All examples I can find, in the Spark repository/different videos is someone
> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
> this gives me a hunch that this is somehow all related to SQL and the likes,
> and not really to DStreams.
> 
> What I'm failing to understand is: Will this feature impact how we do
> Streaming today? Will I be able to consume a Kafka source in a streaming
> fashion (like we do today when we open a stream using KafkaUtils)? Will we
> be able to do state-full operations on a Dataset[T] like we do today using
> MapWithStateRDD? Or will there be a subset of operations that the catalyst
> optimizer can understand such as aggregate and such?
> 
> I'd be happy anyone could shed some light on this.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-in-Spark-2-0-and-DStreams-tp26959.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-15 Thread Benjamin Kim
Hi Ofir,

I just recently saw the webinar with Reynold Xin. He mentioned the Spark 
Session unification efforts, but I don’t remember the DataSet for Structured 
Streaming aka Continuous Applications as he put it. He did mention streaming or 
unlimited DataFrames for Structured Streaming so one can directly query the 
data from it. Has something changed since then?

Thanks,
Ben


> On May 15, 2016, at 1:42 PM, Ofir Manor  wrote:
> 
> Hi Yuval,
> let me share my understanding based on similar questions I had.
> First, Spark 2.x aims to replace a whole bunch of its APIs with just two main 
> ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset (merging 
> of Dataset and Dataframe - which is why it inherits all the SparkSQL 
> goodness), while RDD seems as a low-level API only for special cases. The new 
> Dataset should also support both batch and streaming - replacing (eventually) 
> DStream as well. See the design docs in SPARK-13485 (unified API) and 
> SPARK-8360 (StructuredStreaming) for a good intro. 
> However, as you noted, not all will be fully delivered in 2.0. For example, 
> it seems that streaming from / to Kafka using StructuredStreaming didn't make 
> it (so far?) to 2.0 (which is a showstopper for me). 
> Anyway, as far as I understand, you should be able to apply stateful 
> operators (non-RDD) on Datasets (for example, the new event-time window 
> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
> sinks migrated to the new (richer) API and semantics.
> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
> will align with the current offering...
> 
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286  | Email: 
> ofir.ma...@equalum.io 
> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov  > wrote:
> I've been reading/watching videos about the upcoming Spark 2.0 release which
> brings us Structured Streaming. One thing I've yet to understand is how this
> relates to the current state of working with Streaming in Spark with the
> DStream abstraction.
> 
> All examples I can find, in the Spark repository/different videos is someone
> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
> this gives me a hunch that this is somehow all related to SQL and the likes,
> and not really to DStreams.
> 
> What I'm failing to understand is: Will this feature impact how we do
> Streaming today? Will I be able to consume a Kafka source in a streaming
> fashion (like we do today when we open a stream using KafkaUtils)? Will we
> be able to do state-full operations on a Dataset[T] like we do today using
> MapWithStateRDD? Or will there be a subset of operations that the catalyst
> optimizer can understand such as aggregate and such?
> 
> I'd be happy anyone could shed some light on this.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-in-Spark-2-0-and-DStreams-tp26959.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-15 Thread Benjamin Kim
Ofir,

Thanks for the clarification. I was confused for the moment. The links will be 
very helpful.


> On May 15, 2016, at 2:32 PM, Ofir Manor <ofir.ma...@equalum.io> wrote:
> 
> Ben,
> I'm just a Spark user - but at least in March Spark Summit, that was the main 
> term used.
> Taking a step back from the details, maybe this new post from Reynold is a 
> better intro to Spark 2.0 highlights 
> https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
>  
> <https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html>
> 
> If you want to drill down, go to SPARK-8360 "Structured Streaming (aka 
> Streaming DataFrames)". The design doc (written by Reynold in March) is very 
> readable:
>  https://issues.apache.org/jira/browse/SPARK-8360 
> <https://issues.apache.org/jira/browse/SPARK-8360>
> 
> Regarding directly querying (SQL) the state managed by a streaming process - 
> I don't know if that will land in 2.0 or only later.
> 
> Hope that helps,
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286 <tel:%2B972-54-7801286> | Email: 
> ofir.ma...@equalum.io <mailto:ofir.ma...@equalum.io>
> On Sun, May 15, 2016 at 11:58 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Ofir,
> 
> I just recently saw the webinar with Reynold Xin. He mentioned the Spark 
> Session unification efforts, but I don’t remember the DataSet for Structured 
> Streaming aka Continuous Applications as he put it. He did mention streaming 
> or unlimited DataFrames for Structured Streaming so one can directly query 
> the data from it. Has something changed since then?
> 
> Thanks,
> Ben
> 
> 
>> On May 15, 2016, at 1:42 PM, Ofir Manor <ofir.ma...@equalum.io 
>> <mailto:ofir.ma...@equalum.io>> wrote:
>> 
>> Hi Yuval,
>> let me share my understanding based on similar questions I had.
>> First, Spark 2.x aims to replace a whole bunch of its APIs with just two 
>> main ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset 
>> (merging of Dataset and Dataframe - which is why it inherits all the 
>> SparkSQL goodness), while RDD seems as a low-level API only for special 
>> cases. The new Dataset should also support both batch and streaming - 
>> replacing (eventually) DStream as well. See the design docs in SPARK-13485 
>> (unified API) and SPARK-8360 (StructuredStreaming) for a good intro. 
>> However, as you noted, not all will be fully delivered in 2.0. For example, 
>> it seems that streaming from / to Kafka using StructuredStreaming didn't 
>> make it (so far?) to 2.0 (which is a showstopper for me). 
>> Anyway, as far as I understand, you should be able to apply stateful 
>> operators (non-RDD) on Datasets (for example, the new event-time window 
>> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
>> sinks migrated to the new (richer) API and semantics.
>> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
>> will align with the current offering...
>> 
>> 
>> Ofir Manor
>> 
>> Co-Founder & CTO | Equalum
>> 
>> 
>> Mobile: +972-54-7801286 <tel:%2B972-54-7801286> | Email: 
>> ofir.ma...@equalum.io <mailto:ofir.ma...@equalum.io>
>> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov <yuva...@gmail.com 
>> <mailto:yuva...@gmail.com>> wrote:
>> I've been reading/watching videos about the upcoming Spark 2.0 release which
>> brings us Structured Streaming. One thing I've yet to understand is how this
>> relates to the current state of working with Streaming in Spark with the
>> DStream abstraction.
>> 
>> All examples I can find, in the Spark repository/different videos is someone
>> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
>> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
>> this gives me a hunch that this is somehow all related to SQL and the likes,
>> and not really to DStreams.
>> 
>> What I'm failing to understand is: Will this feature impact how we do
>> Streaming today? Will I be able to consume a Kafka source in a streaming
>> fashion (like we do today when we open a stream using KafkaUtils)? Will we
>> be able to do state-full operations on a Dataset[T] like we do today using
>> MapWithStateRDD? Or will there be a subset of operations that the catalyst
>> optimizer can understand such as aggregate and such?
>> 
>> I'd be happy anyone could s

Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Hi Hyukjin,

I saw that. I don’t know how to use it. I’m still learning Scala on my own. Can 
you help me to start?

Thanks,
Ben

> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> I hope it was not too late :).
> 
> It is possible.
> 
> Please check csvRdd api here, 
> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>  
> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
> 
> Thanks!
> 
> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
> data strings. Each string representing the header row and multiple rows of 
> data along with delimiters. I would like to feed each thru a CSV parser to 
> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
> with this data.
> 
> Please let me know if you have any ideas.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 



Re: JSON Usage

2016-04-15 Thread Benjamin Kim
Holden,

If I were to use DataSets, then I would essentially do this:

val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
for (message <- messages.asScala) {
val files = sqlContext.read.json(message.getBody())
}

Can I simply do files.toDS() or do I have to create a schema using a case class 
File and apply it as[File]? If I have to apply a schema, then how would I 
create it based on the JSON structure below, especially the nested elements.

Thanks,
Ben


> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
> 
> You could certainly use RDDs for that, you might also find using Dataset 
> selecting the fields you need to construct the URL to fetch and then using 
> the map function to be easier.
> 
> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I was wonder what would be the best way to use JSON in Spark/Scala. I need to 
> lookup values of fields in a collection of records to form a URL and download 
> that file at that location. I was thinking an RDD would be perfect for this. 
> I just want to hear from others who might have more experience in this. Below 
> is the actual JSON structure that I am trying to use for the S3 bucket and 
> key values of each “record" within “Records".
> 
> {
>"Records":[
>   {
>  "eventVersion":"2.0",
>  "eventSource":"aws:s3",
>  "awsRegion":"us-east-1",
>  "eventTime":The time, in ISO-8601 format, for example, 
> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>  "eventName":"event-type",
>  "userIdentity":{
> 
> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>  },
>  "requestParameters":{
> "sourceIPAddress":"ip-address-where-request-came-from"
>  },
>  "responseElements":{
> "x-amz-request-id":"Amazon S3 generated request ID",
> "x-amz-id-2":"Amazon S3 host that processed the request"
>  },
>  "s3":{
> "s3SchemaVersion":"1.0",
> "configurationId":"ID found in the bucket notification 
> configuration",
> "bucket":{
>"name":"bucket-name",
>"ownerIdentity":{
>   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>},
>"arn":"bucket-ARN"
> },
> "object":{
>"key":"object-key",
>"size":object-size,
>"eTag":"object eTag",
>"versionId":"object version if bucket is versioning-enabled, 
> otherwise null",
>"sequencer": "a string representation of a hexadecimal value 
> used to determine event sequence,
>only used with PUTs and DELETEs"
> }
>  }
>   },
>   {
>   // Additional events
>   }
>]
> }
> 
> Thanks
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>


JSON Usage

2016-04-14 Thread Benjamin Kim
I was wonder what would be the best way to use JSON in Spark/Scala. I need to 
lookup values of fields in a collection of records to form a URL and download 
that file at that location. I was thinking an RDD would be perfect for this. I 
just want to hear from others who might have more experience in this. Below is 
the actual JSON structure that I am trying to use for the S3 bucket and key 
values of each “record" within “Records".

{  
   "Records":[  
  {  
 "eventVersion":"2.0",
 "eventSource":"aws:s3",
 "awsRegion":"us-east-1",
 "eventTime":The time, in ISO-8601 format, for example, 
1970-01-01T00:00:00.000Z, when S3 finished processing the request,
 "eventName":"event-type",
 "userIdentity":{  
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
 },
 "requestParameters":{  
"sourceIPAddress":"ip-address-where-request-came-from"
 },
 "responseElements":{  
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
 },
 "s3":{  
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification 
configuration",
"bucket":{  
   "name":"bucket-name",
   "ownerIdentity":{  
  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
   },
   "arn":"bucket-ARN"
},
"object":{  
   "key":"object-key",
   "size":object-size,
   "eTag":"object eTag",
   "versionId":"object version if bucket is versioning-enabled, 
otherwise null",
   "sequencer": "a string representation of a hexadecimal value 
used to determine event sequence, 
   only used with PUTs and DELETEs"
}
 }
  },
  {
  // Additional events
  }
   ]
}

Thanks
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Is this right?


import com.databricks.spark.csv

val csvRdd = data.flatMap(x => x.split("\n"))
val df = new CsvParser().csvRdd(sqlContext, csvRdd, useHeader = true)

Thanks,
Ben


> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com 
>> <mailto:gurwls...@gmail.com>> wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
> 



Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Thanks!

I got this to work.

val csvRdd = sc.parallelize(data.split("\n"))
val df = new 
com.databricks.spark.csv.CsvParser().withUseHeader(true).withInferSchema(true).csvRdd(sqlContext,
 csvRdd)

> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com 
>> <mailto:gurwls...@gmail.com>> wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
> 



Re: JSON Usage

2016-04-17 Thread Benjamin Kim
Hyukjin,

This is what I did so far. I didn’t use DataSet yet or maybe I don’t need to.

var df: DataFrame = null
for(message <- messages) {
val bodyRdd = sc.parallelize(message.getBody() :: Nil)
val fileDf = sqlContext.read.json(bodyRdd)
.select(
$"Records.s3.bucket.name".as("bucket"),
$"Records.s3.object.key".as("key")
)
if (df != null) {
  df = df.unionAll(fileDf)
} else {
  df = fileDf
}
}
df.show

Each result is returned as an array. I just need to concatenate them together 
to make the S3 URL, and download the files per URL. This I need help with next.

Thanks,
Ben

> On Apr 17, 2016, at 7:38 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi!
> 
> Personally, I don't think it necessarily needs to be DataSet for your goal.
> 
> Just select your data at "s3" from DataFrame loaded by sqlContext.read.json().
> 
> You can try to printSchema() to check the nested schema and then select the 
> data.
> 
> Also, I guess (from your codes) you are trying to send a reauest, fetch the 
> response to driver-side, and then send each message to executor-side. I guess 
> there would be really heavy overhead in driver-side.
> Holden,
> 
> If I were to use DataSets, then I would essentially do this:
> 
> val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
> val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
> for (message <- messages.asScala) {
> val files = sqlContext.read.json(message.getBody())
> }
> 
> Can I simply do files.toDS() or do I have to create a schema using a case 
> class File and apply it as[File]? If I have to apply a schema, then how would 
> I create it based on the JSON structure below, especially the nested elements.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca 
>> <mailto:hol...@pigscanfly.ca>> wrote:
>> 
>> You could certainly use RDDs for that, you might also find using Dataset 
>> selecting the fields you need to construct the URL to fetch and then using 
>> the map function to be easier.
>> 
>> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I was wonder what would be the best way to use JSON in Spark/Scala. I need 
>> to lookup values of fields in a collection of records to form a URL and 
>> download that file at that location. I was thinking an RDD would be perfect 
>> for this. I just want to hear from others who might have more experience in 
>> this. Below is the actual JSON structure that I am trying to use for the S3 
>> bucket and key values of each “record" within “Records".
>> 
>> {
>>"Records":[
>>   {
>>  "eventVersion":"2.0",
>>  "eventSource":"aws:s3",
>>  "awsRegion":"us-east-1",
>>  "eventTime":The time, in ISO-8601 format, for example, 
>> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>>  "eventName":"event-type",
>>  "userIdentity":{
>> 
>> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>>  },
>>  "requestParameters":{
>> "sourceIPAddress":"ip-address-where-request-came-from"
>>  },
>>  "responseElements":{
>> "x-amz-request-id":"Amazon S3 generated request ID",
>> "x-amz-id-2":"Amazon S3 host that processed the request"
>>  },
>>  "s3":{
>> "s3SchemaVersion":"1.0",
>> "configurationId":"ID found in the bucket notification 
>> configuration",
>> "bucket":{
>>"name":"bucket-name",
>>"ownerIdentity":{
>>   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>>},
>>"arn":"bucket-ARN"
>> },
>> "object":{
>>"key":"object-key",
>>"size":object-size,
>>"eTag":"object eTag",
>>"versionId":"object version if bucket is versioning-enabled, 
>> otherwise null",
>>"sequencer": "a string representation of a hexadecimal value 
>> used to determine event sequence,
>>only used with PUTs and DELETEs"
>> }
>>  }
>>   },
>>   {
>>   // Additional events
>>   }
>>]
>> }
>> 
>> Thanks
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
>> 
>> 
>> -- 
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>



HBase Spark Module

2016-04-20 Thread Benjamin Kim
I see that the new CDH 5.7 has been release with the HBase Spark module 
built-in. I was wondering if I could just download it and use the hbase-spark 
jar file for CDH 5.5. Has anyone tried this yet?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Has anyone found an easy way to save a DataFrame into HBase?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Hi Ted,

Can this module be used with an older version of HBase, such as 1.0 or 1.1? 
Where can I get the module from?

Thanks,
Ben

> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
> this.
> 
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: How to connect HBase and Spark using Python?

2016-07-22 Thread Benjamin Kim
It is included in Cloudera’s CDH 5.8.

> On Jul 22, 2016, at 6:13 PM, Mail.com  wrote:
> 
> Hbase Spark module will be available with Hbase 2.0. Is that out yet?
> 
>> On Jul 22, 2016, at 8:50 PM, Def_Os  wrote:
>> 
>> So it appears it should be possible to use HBase's new hbase-spark module, if
>> you follow this pattern:
>> https://hbase.apache.org/book.html#_sparksql_dataframes
>> 
>> Unfortunately, when I run my example from PySpark, I get the following
>> exception:
>> 
>> 
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
>>> : java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource
>>> does not allow create table as select.
>>>   at scala.sys.package$.error(package.scala:27)
>>>   at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:259)
>>>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>   at java.lang.reflect.Method.invoke(Method.java:606)
>>>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>>   at py4j.Gateway.invoke(Gateway.java:259)
>>>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>   at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>>   at java.lang.Thread.run(Thread.java:745)
>> 
>> Even when I created the table in HBase first, it still failed.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-connect-HBase-and-Spark-using-Python-tp27372p27397.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Website

2016-07-13 Thread Benjamin Kim
It takes me to the directories instead of the webpage.

> On Jul 13, 2016, at 11:45 AM, manish ranjan <cse1.man...@gmail.com> wrote:
> 
> working for me. What do you mean 'as supposed to'?
> 
> ~Manish
> 
> 
> 
> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone noticed that the spark.apache.org <http://spark.apache.org/> is 
> not working as supposed to?
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Spark Website

2016-07-13 Thread Benjamin Kim
Has anyone noticed that the spark.apache.org is not working as supposed to?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: transtition SQLContext to SparkSession

2016-07-18 Thread Benjamin Kim
From what I read, there is no more Contexts.

"SparkContext, SQLContext, HiveContext merged into SparkSession"

I have not tested it, but I don’t know if it’s true.

Cheers,
Ben


> On Jul 18, 2016, at 8:37 AM, Koert Kuipers  wrote:
> 
> in my codebase i would like to gradually transition to SparkSession, so while 
> i start using SparkSession i also want a SQLContext to be available as before 
> (but with a deprecated warning when i use it). this should be easy since 
> SQLContext is now a wrapper for SparkSession.
> 
> so basically:
> val session = SparkSession.builder.set(..., ...).getOrCreate()
> val sqlc = new SQLContext(session)
> 
> however this doesnt work, the SQLContext constructor i am trying to use is 
> private. SparkSession.sqlContext is also private.
> 
> am i missing something?
> 
> a non-gradual switch is not very realistic in any significant codebase, and i 
> do not want to create SparkSession and SQLContext independendly (both from 
> same SparkContext) since that can only lead to confusion and inconsistent 
> settings.



SnappyData and Structured Streaming

2016-07-05 Thread Benjamin Kim
I recently got a sales email from SnappyData, and after reading the 
documentation about what they offer, it sounds very similar to what Structured 
Streaming will offer w/o the underlying in-memory, spill-to-disk, CRUD 
compliant data storage in SnappyData. I was wondering if Structured Streaming 
is trying to achieve the same on its own or is SnappyData contributing 
Streaming extensions that they built to the Spark project. Lastly, what does 
the Spark community think of this so-called “Spark Data Store”?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

Thanks for the details. This makes things much clearer. I saw in the Spark 
roadmap that version 2.1 will add the SQL capabilities mentioned here. It looks 
like, gradually, the Spark community is coming to the same conclusions that the 
SnappyData folks have come to a while back in terms of Streaming. But, there is 
always the need for a better way to store data underlying Spark. The State 
Store information was informative too. I can envision that it can use this data 
store too if need be.

Thanks again,
Ben

> On Jul 6, 2016, at 8:52 AM, Jags Ramnarayan <jramnara...@snappydata.io> wrote:
> 
> The plan is to fully integrate with the new structured streaming API and 
> implementation in an upcoming release. But, we will continue offering several 
> extensions. Few noted below ...
> 
> - the store (streaming sink) will offer a lot more capabilities like 
> transactions, replicated tables, partitioned row and column oriented tables 
> to suit different types of workloads. 
> - While streaming API(scala) in snappydata itself will change a bit to become 
> fully compatible with structured streaming(SchemaDStream will go away), we 
> will continue to offer SQL support for streams so they can be managed from 
> external clients (JDBC, ODBC), their partitions can share the same 
> partitioning strategy as the underlying table where it might be stored, and 
> even registrations of continuous queries from remote clients. 
> 
> While building streaming apps using the Spark APi offers tremendous 
> flexibility we also want to make it simple for apps to work with streams just 
> using SQL. For instance, you should be able to declaratively specify a table 
> as a sink to a stream(i.e. using SQL). For example, you can specify a "TopK 
> Table" (a built in special table for topK analytics using probabilistic data 
> structures) as a sink for a high velocity time series stream like this - 
> "create topK table MostPopularTweets on tweetStreamTable " +
> "options(key 'hashtag', frequencyCol 'retweets', timeSeriesColumn 
> 'tweetTime' )" 
> where 'tweetStreamTable' is created using the 'create stream table ...' SQL 
> syntax. 
> 
> 
> -
> Jags
> SnappyData blog <http://www.snappydata.io/blog>
> Download binary, source <https://github.com/SnappyDataInc/snappydata>
> 
> 
> On Wed, Jul 6, 2016 at 8:02 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Jags,
> 
> I should have been more specific. I am referring to what I read at 
> http://snappydatainc.github.io/snappydata/streamingWithSQL/ 
> <http://snappydatainc.github.io/snappydata/streamingWithSQL/>, especially the 
> Streaming Tables part. It roughly coincides with the Streaming DataFrames 
> outlined here 
> https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h
>  
> <https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h>.
>  I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
> this question.
> 
> Thanks,
> Ben
> 
>> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan <jramnara...@snappydata.io 
>> <mailto:jramnara...@snappydata.io>> wrote:
>> 
>> Ben,
>>Note that Snappydata's primary objective is to be a distributed in-memory 
>> DB for mixed workloads (i.e. streaming with transactions and analytic 
>> queries). On the other hand, Spark, till date, is primarily designed as a 
>> processing engine over myriad storage engines (SnappyData being one). So, 
>> the marriage is quite complementary. The difference compared to other stores 
>> is that SnappyData realizes its solution by deeply integrating and 
>> collocating with Spark (i.e. share spark executor memory/resources with the 
>> store) avoiding serializations and shuffle in many situations.
>> 
>> On your specific thought about being similar to Structured streaming, a 
>> better discussion could be a comparison to the recently introduced State 
>> store 
>> <https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7>
>>  (perhaps this is what you meant). 
>> It proposes a KV store for streaming aggregations with support for updates. 
>> The proposed API will, at some point, be pluggable so vendors can easily 
>> support alternate implementations to storage, not just HDFS(default store in 
>> proposed State store). 
>> 
>> 
>> -
>> Jags
>> SnappyData blog <http://www.snappydata.io/blog>
>> Download binary, source <https://github.com/SnappyDataInc/snappydata>
&g

Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

I should have been more specific. I am referring to what I read at 
http://snappydatainc.github.io/snappydata/streamingWithSQL/, especially the 
Streaming Tables part. It roughly coincides with the Streaming DataFrames 
outlined here 
https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h.
 I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
this question.

Thanks,
Ben

> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan <jramnara...@snappydata.io> wrote:
> 
> Ben,
>Note that Snappydata's primary objective is to be a distributed in-memory 
> DB for mixed workloads (i.e. streaming with transactions and analytic 
> queries). On the other hand, Spark, till date, is primarily designed as a 
> processing engine over myriad storage engines (SnappyData being one). So, the 
> marriage is quite complementary. The difference compared to other stores is 
> that SnappyData realizes its solution by deeply integrating and collocating 
> with Spark (i.e. share spark executor memory/resources with the store) 
> avoiding serializations and shuffle in many situations.
> 
> On your specific thought about being similar to Structured streaming, a 
> better discussion could be a comparison to the recently introduced State 
> store 
> <https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7>
>  (perhaps this is what you meant). 
> It proposes a KV store for streaming aggregations with support for updates. 
> The proposed API will, at some point, be pluggable so vendors can easily 
> support alternate implementations to storage, not just HDFS(default store in 
> proposed State store). 
> 
> 
> -
> Jags
> SnappyData blog <http://www.snappydata.io/blog>
> Download binary, source <https://github.com/SnappyDataInc/snappydata>
> 
> 
> On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I recently got a sales email from SnappyData, and after reading the 
> documentation about what they offer, it sounds very similar to what 
> Structured Streaming will offer w/o the underlying in-memory, spill-to-disk, 
> CRUD compliant data storage in SnappyData. I was wondering if Structured 
> Streaming is trying to achieve the same on its own or is SnappyData 
> contributing Streaming extensions that they built to the Spark project. 
> Lastly, what does the Spark community think of this so-called “Spark Data 
> Store”?
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



HBase-Spark Module

2016-07-29 Thread Benjamin Kim
I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kudu Connector

2016-06-29 Thread Benjamin Kim
I was wondering if anyone, who is a Spark Scala developer, would be willing to 
continue the work done for the Kudu connector?

https://github.com/apache/incubator-kudu/tree/master/java/kudu-spark/src/main/scala/org/kududb/spark/kudu

I have been testing and using Kudu for the past month and comparing against 
HBase. It seems like a promising data store to complement Spark. It fills the 
gap in our company as a fast updatable data store. We stream GB’s of data in 
and run analytical queries against it, which run in well below a minute 
typically. According to the Kudu users group, all it needs is to add SQL (JDBC) 
friendly features (CREATE TABLE, intuitive save modes (append = upsert and 
overwrite = truncate + insert), DELETE, etc.) and improve performance by 
implementing locality.

For reference, here is the page on contributing.

http://kudu.apache.org/docs/contributing.html

I am hoping that for individuals in the Spark community it would be relatively 
easy.

Thanks!
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

I found a profile for Spark 2.11 and removed it. Now, it brings in 2.10. I ran 
some code and got further. Now, I get this error below when I do a “df.show”.

java.lang.AbstractMethodError
at org.apache.spark.Logging$class.log(Logging.scala:50)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseFilter$.log(HBaseFilter.scala:122)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseFilter$.buildFilters(HBaseFilter.scala:125)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.getPartitions(HBaseTableScan.scala:59)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)

Thanks for all your help.

Cheers,
Ben


> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Did you check the actual maven dep tree? Something might be pulling in a 
> different version. Also, if you're seeing this locally, you might want to 
> check which version of the scala sdk your IDE is using
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Asher,
> 
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
> (1.8) version as our installation. The Scala (2.10.5) version is already the 
> same as ours. But I’m still getting the same error. Can you think of anything 
> else?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Ben,
>> 
>> That looks like a scala version mismatch. Have you checked your dep tree?
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Elek,
>> 
>> Can you give me some sample code? I can’t get mine to work.
>> 
>> import org.apache.spark.sql.{SQLContext, _}
>> import org.apache.spark.sql.execution.datasources.hbase._
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> def cat = s"""{
>> |"table":{"namespace":"ben", "name":"dmp_test", 
>> "tableCoder":"PrimitiveType"},
>> |"rowkey":"key",
>> |"columns":{
>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>> |}
>> |}""".stripMargin
>> 
>> import sqlContext.implicits._
>> 
>> def withCatalog(cat: String): DataFrame = {
>> sqlContext
>> .read
>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>> .format("org.apache.spark.sql.execution.datasources.hbase")
>> .load()
>> }
>> 
>> val df = withCatalog(cat)
>> df.show
>> 
>> It gives me this error.
>> 
>> java.lang.NoSuchMethodError: 
>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>  at 
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>> 
>> If you can please help, I would be grateful.
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>> <mailto:h...@anzix.net>> wrote:
>>> 
>>> 
>>> I tested this one with hbase 1.2.4:
>>> 
>>> https://github.com/hortonworks-spark/shc 
>>> <https://github.com/hortonworks-spark/shc>
>>> 
>>> Marton
>>> 
>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>> tried to build it from source, but I cannot get it to work.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> 
> 
> 



Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Hi Asher,

I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java (1.8) 
version as our installation. The Scala (2.10.5) version is already the same as 
ours. But I’m still getting the same error. Can you think of anything else?

Cheers,
Ben


> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Ben,
> 
> That looks like a scala version mismatch. Have you checked your dep tree?
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Elek,
> 
> Can you give me some sample code? I can’t get mine to work.
> 
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
> 
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test", 
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
> 
> import sqlContext.implicits._
> 
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
> 
> val df = withCatalog(cat)
> df.show
> 
> It gives me this error.
> 
> java.lang.NoSuchMethodError: 
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
> 
> If you can please help, I would be grateful.
> 
> Cheers,
> Ben
> 
> 
>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>> <mailto:h...@anzix.net>> wrote:
>> 
>> 
>> I tested this one with hbase 1.2.4:
>> 
>> https://github.com/hortonworks-spark/shc 
>> <https://github.com/hortonworks-spark/shc>
>> 
>> Marton
>> 
>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>> tried to build it from source, but I cannot get it to work.
>>> 
>>> Thanks,
>>> Ben
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
> 
> 



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
I'll clean up any .m2 or .ivy directories. And try again.

I ran this on our lab cluster for testing.

Cheers,
Ben


On Fri, Feb 3, 2017 at 8:16 AM Asher Krim <ak...@hubspot.com> wrote:

> Did you check the actual maven dep tree? Something might be pulling in a
> different version. Also, if you're seeing this locally, you might want to
> check which version of the scala sdk your IDE is using
>
> Asher Krim
> Senior Software Engineer
>
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
> Hi Asher,
>
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java
> (1.8) version as our installation. The Scala (2.10.5) version is already
> the same as ours. But I’m still getting the same error. Can you think of
> anything else?
>
> Cheers,
> Ben
>
>
> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com> wrote:
>
> Ben,
>
> That looks like a scala version mismatch. Have you checked your dep tree?
>
> Asher Krim
> Senior Software Engineer
>
> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
> Elek,
>
> Can you give me some sample code? I can’t get mine to work.
>
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
>
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test",
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
>
> import sqlContext.implicits._
>
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
>
> val df = withCatalog(cat)
> df.show
>
>
> It gives me this error.
>
> java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
> at
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
> at
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
> at
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>
>
> If you can please help, I would be grateful.
>
> Cheers,
> Ben
>
>
> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
>
>
> I tested this one with hbase 1.2.4:
>
> https://github.com/hortonworks-spark/shc
>
> Marton
>
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>
> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I
> tried to build it from source, but I cannot get it to work.
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
>


Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

You’re right. I don’t see anything but 2.11 being pulled in. Do you know where 
I can change this?

Cheers,
Ben


> On Feb 3, 2017, at 10:50 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Sorry for my persistence, but did you actually run "mvn dependency:tree 
> -Dverbose=true"? And did you see only scala 2.10.5 being pulled in?
> 
> On Fri, Feb 3, 2017 at 12:33 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Asher,
> 
> It’s still the same. Do you have any other ideas?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Did you check the actual maven dep tree? Something might be pulling in a 
>> different version. Also, if you're seeing this locally, you might want to 
>> check which version of the scala sdk your IDE is using
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Hi Asher,
>> 
>> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
>> (1.8) version as our installation. The Scala (2.10.5) version is already the 
>> same as ours. But I’m still getting the same error. Can you think of 
>> anything else?
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>>> <mailto:ak...@hubspot.com>> wrote:
>>> 
>>> Ben,
>>> 
>>> That looks like a scala version mismatch. Have you checked your dep tree?
>>> 
>>> Asher Krim
>>> Senior Software Engineer
>>> 
>>> 
>>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Elek,
>>> 
>>> Can you give me some sample code? I can’t get mine to work.
>>> 
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> 
>>> def cat = s"""{
>>> |"table":{"namespace":"ben", "name":"dmp_test", 
>>> "tableCoder":"PrimitiveType"},
>>> |"rowkey":"key",
>>> |"columns":{
>>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>>> |}
>>> |}""".stripMargin
>>> 
>>> import sqlContext.implicits._
>>> 
>>> def withCatalog(cat: String): DataFrame = {
>>> sqlContext
>>> .read
>>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>>> .format("org.apache.spark.sql.execution.datasources.hbase")
>>> .load()
>>> }
>>> 
>>> val df = withCatalog(cat)
>>> df.show
>>> 
>>> It gives me this error.
>>> 
>>> java.lang.NoSuchMethodError: 
>>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>> at 
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>>> 
>>> If you can please help, I would be grateful.
>>> 
>>> Cheers,
>>> Ben
>>> 
>>> 
>>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>>> <mailto:h...@anzix.net>> wrote:
>>>> 
>>>> 
>>>> I tested this one with hbase 1.2.4:
>>>> 
>>>> https://github.com/hortonworks-spark/shc 
>>>> <https://github.com/hortonworks-spark/shc>
>>>> 
>>>> Marton
>>>> 
>>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>>> tried to build it from source, but I cannot get it to work.
>>>>> 
>>>>> Thanks,
>>>>> Ben
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>>> 
>>>> 
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

It’s still the same. Do you have any other ideas?

Cheers,
Ben


> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Did you check the actual maven dep tree? Something might be pulling in a 
> different version. Also, if you're seeing this locally, you might want to 
> check which version of the scala sdk your IDE is using
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Asher,
> 
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
> (1.8) version as our installation. The Scala (2.10.5) version is already the 
> same as ours. But I’m still getting the same error. Can you think of anything 
> else?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Ben,
>> 
>> That looks like a scala version mismatch. Have you checked your dep tree?
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Elek,
>> 
>> Can you give me some sample code? I can’t get mine to work.
>> 
>> import org.apache.spark.sql.{SQLContext, _}
>> import org.apache.spark.sql.execution.datasources.hbase._
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> def cat = s"""{
>> |"table":{"namespace":"ben", "name":"dmp_test", 
>> "tableCoder":"PrimitiveType"},
>> |"rowkey":"key",
>> |"columns":{
>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>> |}
>> |}""".stripMargin
>> 
>> import sqlContext.implicits._
>> 
>> def withCatalog(cat: String): DataFrame = {
>> sqlContext
>> .read
>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>> .format("org.apache.spark.sql.execution.datasources.hbase")
>> .load()
>> }
>> 
>> val df = withCatalog(cat)
>> df.show
>> 
>> It gives me this error.
>> 
>> java.lang.NoSuchMethodError: 
>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>  at 
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>> 
>> If you can please help, I would be grateful.
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>> <mailto:h...@anzix.net>> wrote:
>>> 
>>> 
>>> I tested this one with hbase 1.2.4:
>>> 
>>> https://github.com/hortonworks-spark/shc 
>>> <https://github.com/hortonworks-spark/shc>
>>> 
>>> Marton
>>> 
>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>> tried to build it from source, but I cannot get it to work.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> 
> 
> 



HBase Spark

2017-01-31 Thread Benjamin Kim
Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I tried 
to build it from source, but I cannot get it to work.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Elek,

Can you give me some sample code? I can’t get mine to work.

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}

def cat = s"""{
|"table":{"namespace":"ben", "name":"dmp_test", 
"tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"d", "col":"google_gid", "type":"string"}
|}
|}""".stripMargin

import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(cat)
df.show

It gives me this error.

java.lang.NoSuchMethodError: 
scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
at 
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)

If you can please help, I would be grateful.

Cheers,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: HBase Spark

2017-01-31 Thread Benjamin Kim
Elek,

If I cannot use the HBase Spark module, then I’ll give it a try.

Thanks,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
Hi Gourav,

My answers are below.

Cheers,
Ben


> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Can I ask where are you running your CDH? Is it on premise or have you 
> created a cluster for yourself in AWS? Our cluster in on premise in our data 
> center.
> 
> Also I have really never seen use s3a before, that was used way long before 
> when writing s3 files took a long time, but I think that you are reading it. 
> 
> Anyideas why you are not migrating to Spark 2.1, besides speed, there are 
> lots of apis which are new and the existing ones are being deprecated. 
> Therefore there is a very high chance that you are already working on code 
> which is being deprecated by the SPARK community right now. We use CDH and 
> upgrade with whatever Spark version they include, which is 1.6.0. We are 
> waiting for the move to Spark 2.0/2.1.
> 
> And besides that would you not want to work on a platform which is at least 
> 10 times faster What would that be?
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
> file from AWS S3. We can read the schema and show some data when the file is 
> loaded into a DataFrame, but when we try to do some operations, such as 
> count, we get this error below.
> 
> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
> credentials from any provider in the chain
> at 
> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
> at 
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Can anyone help?
> 
> Cheers,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Re: Get S3 Parquet File

2017-02-24 Thread Benjamin Kim
Gourav,

I’ll start experimenting with Spark 2.1 to see if this works.

Cheers,
Ben


> On Feb 24, 2017, at 5:46 AM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Hi Benjamin,
> 
> First of all fetching data from S3 while writing a code in on premise system 
> is a very bad idea. You might want to first copy the data in to local HDFS 
> before running your code. Ofcourse this depends on the volume of data and 
> internet speed that you have.
> 
> The platform which makes your data at least 10 times faster is SPARK 2.1. And 
> trust me you do not want to be writing code which needs you to update it once 
> again in 6 months because newer versions of SPARK now find it deprecated.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Fri, Feb 24, 2017 at 7:18 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Gourav,
> 
> My answers are below.
> 
> Cheers,
> Ben
> 
> 
>> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta <gourav.sengu...@gmail.com 
>> <mailto:gourav.sengu...@gmail.com>> wrote:
>> 
>> Can I ask where are you running your CDH? Is it on premise or have you 
>> created a cluster for yourself in AWS? Our cluster in on premise in our data 
>> center.
>> 
>> Also I have really never seen use s3a before, that was used way long before 
>> when writing s3 files took a long time, but I think that you are reading it. 
>> 
>> Anyideas why you are not migrating to Spark 2.1, besides speed, there are 
>> lots of apis which are new and the existing ones are being deprecated. 
>> Therefore there is a very high chance that you are already working on code 
>> which is being deprecated by the SPARK community right now. We use CDH and 
>> upgrade with whatever Spark version they include, which is 1.6.0. We are 
>> waiting for the move to Spark 2.0/2.1.
>> 
>> And besides that would you not want to work on a platform which is at least 
>> 10 times faster What would that be?
>> 
>> Regards,
>> Gourav Sengupta
>> 
>> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
>> file from AWS S3. We can read the schema and show some data when the file is 
>> loaded into a DataFrame, but when we try to do some operations, such as 
>> count, we get this error below.
>> 
>> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
>> credentials from any provider in the chain
>> at 
>> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
>> at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
>> at 
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
>> at 
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>> at 
>> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
>> at 
>> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
>> at 
>> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
>> at 
>> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
>> at 
>> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>

Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
file from AWS S3. We can read the schema and show some data when the file is 
loaded into a DataFrame, but when we try to do some operations, such as count, 
we get this error below.

com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
credentials from any provider in the chain
at 
com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at 
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
at 
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
at 
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
at 
org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Can anyone help?

Cheers,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
Aakash,

Here is a code snippet for the keys.

val accessKey = “---"
val secretKey = “---"

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)
hadoopConf.set("spark.hadoop.fs.s3a.access.key",accessKey)
hadoopConf.set("spark.hadoop.fs.s3a.secret.key",secretKey)

val df = 
sqlContext.read.parquet("s3a://aps.optus/uc2/BI_URL_DATA_HLY_20170201_09.PARQUET.gz")
df.show
df.count

When we do the count, then the error happens.

Thanks,
Ben


> On Feb 23, 2017, at 10:31 AM, Aakash Basu <aakash.spark@gmail.com> wrote:
> 
> Hey,
> 
> Please recheck your access key and secret key being used to fetch the parquet 
> file. It seems to be a credential error. Either mismatch/load. If load, then 
> first use it directly in code and see if the issue resolves, then it can be 
> hidden and read from Input Params.
> 
> Thanks,
> Aakash.
> 
> 
> On 23-Feb-2017 11:54 PM, "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
> file from AWS S3. We can read the schema and show some data when the file is 
> loaded into a DataFrame, but when we try to do some operations, such as 
> count, we get this error below.
> 
> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
> credentials from any provider in the chain
> at 
> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
> at 
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Can anyone help?
> 
> Cheers,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Remove dependence on HDFS

2017-02-11 Thread Benjamin Kim
Has anyone got some advice on how to remove the reliance on HDFS for storing 
persistent data. We have an on-premise Spark cluster. It seems like a waste of 
resources to keep adding nodes because of a lack of storage space only. I would 
rather add more powerful nodes due to the lack of processing power at a less 
frequent rate, than add less powerful nodes at a more frequent rate just to 
handle the ever growing data. Can anyone point me in the right direction? Is 
Alluxio a good solution? S3? I would like to hear your thoughts.

Cheers,
Ben 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Parquet Gzipped Files

2017-02-14 Thread Benjamin Kim
Jörn,

I agree with you, but the vendor is a little difficult to work with. For now, I 
will try to decompress it from S3 and save it plainly into HDFS. If someone 
already has this example, please let me know.

Cheers,
Ben


> On Feb 13, 2017, at 9:50 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Your vendor should use the parquet internal compression and not take a 
> parquet file and gzip it.
> 
>> On 13 Feb 2017, at 18:48, Benjamin Kim <bbuil...@gmail.com> wrote:
>> 
>> We are receiving files from an outside vendor who creates a Parquet data 
>> file and Gzips it before delivery. Does anyone know how to Gunzip the file 
>> in Spark and inject the Parquet data into a DataFrame? I thought using 
>> sc.textFile or sc.wholeTextFiles would automatically Gunzip the file, but 
>> I’m getting a decompression header error when trying to open the Parquet 
>> file.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parquet Gzipped Files

2017-02-13 Thread Benjamin Kim
We are receiving files from an outside vendor who creates a Parquet data file 
and Gzips it before delivery. Does anyone know how to Gunzip the file in Spark 
and inject the Parquet data into a DataFrame? I thought using sc.textFile or 
sc.wholeTextFiles would automatically Gunzip the file, but I’m getting a 
decompression header error when trying to open the Parquet file.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SQL Tables on top of HBase Tables

2016-09-02 Thread Benjamin Kim
I was wondering if anyone has tried to create Spark SQL tables on top of HBase 
tables so that data in HBase can be accessed using Spark Thriftserver with SQL 
statements? This is similar what can be done using Hive.

Thanks,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Tables on top of HBase Tables

2016-09-03 Thread Benjamin Kim
Mich,

I’m in the same boat. We can use Hive but not Spark.

Cheers,
Ben

> On Sep 2, 2016, at 3:37 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Hi,
> 
> You can create Hive external  tables on top of existing Hbase table using the 
> property
> 
> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> 
> Example
> 
> hive> show create table hbase_table;
> OK
> CREATE TABLE `hbase_table`(
>   `key` int COMMENT '',
>   `value1` string COMMENT '',
>   `value2` int COMMENT '',
>   `value3` int COMMENT '')
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES (
>   'hbase.columns.mapping'=':key,a:b,a:c,d:e',
>   'serialization.format'='1')
> TBLPROPERTIES (
>   'transient_lastDdlTime'='1472370939')
> 
>  Then try to access this Hive table from Spark which is giving me grief at 
> the moment :(
> 
> scala> HiveContext.sql("use test")
> res9: org.apache.spark.sql.DataFrame = []
> scala> val hbase_table= spark.table("hbase_table")
> 16/09/02 23:31:07 ERROR log: error in initSerDe: 
> java.lang.ClassNotFoundException Class 
> org.apache.hadoop.hive.hbase.HBaseSerDe not found
> 
> HTH
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 2 September 2016 at 23:08, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com 
> <mailto:mdkhajaasm...@gmail.com>> wrote:
> Hi Kim,
> 
> I am also looking for same information. Just got the same requirement today.
> 
> Thanks,
> Asmath
> 
> On Fri, Sep 2, 2016 at 4:46 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I was wondering if anyone has tried to create Spark SQL tables on top of 
> HBase tables so that data in HBase can be accessed using Spark Thriftserver 
> with SQL statements? This is similar what can be done using Hive.
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 



Re: Spark SQL Tables on top of HBase Tables

2016-09-03 Thread Benjamin Kim
I’m using Spark 1.6 and HBase 1.2. Have you got it to work using these versions?

> On Sep 3, 2016, at 12:49 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> I am trying to find a solution for this
> 
> ERROR log: error in initSerDe: java.lang.ClassNotFoundException Class 
> org.apache.hadoop.hive.hbase.HBaseSerDe not found
> 
> I am using Spark 2 and Hive 2!
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 3 September 2016 at 20:31, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Mich,
> 
> I’m in the same boat. We can use Hive but not Spark.
> 
> Cheers,
> Ben
> 
>> On Sep 2, 2016, at 3:37 PM, Mich Talebzadeh <mich.talebza...@gmail.com 
>> <mailto:mich.talebza...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> You can create Hive external  tables on top of existing Hbase table using 
>> the property
>> 
>> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>> 
>> Example
>> 
>> hive> show create table hbase_table;
>> OK
>> CREATE TABLE `hbase_table`(
>>   `key` int COMMENT '',
>>   `value1` string COMMENT '',
>>   `value2` int COMMENT '',
>>   `value3` int COMMENT '')
>> ROW FORMAT SERDE
>>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
>> STORED BY
>>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>> WITH SERDEPROPERTIES (
>>   'hbase.columns.mapping'=':key,a:b,a:c,d:e',
>>   'serialization.format'='1')
>> TBLPROPERTIES (
>>   'transient_lastDdlTime'='1472370939')
>> 
>>  Then try to access this Hive table from Spark which is giving me grief at 
>> the moment :(
>> 
>> scala> HiveContext.sql("use test")
>> res9: org.apache.spark.sql.DataFrame = []
>> scala> val hbase_table= spark.table("hbase_table")
>> 16/09/02 23:31:07 ERROR log: error in initSerDe: 
>> java.lang.ClassNotFoundException Class 
>> org.apache.hadoop.hive.hbase.HBaseSerDe not found
>> 
>> HTH
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 2 September 2016 at 23:08, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com 
>> <mailto:mdkhajaasm...@gmail.com>> wrote:
>> Hi Kim,
>> 
>> I am also looking for same information. Just got the same requirement today.
>> 
>> Thanks,
>> Asmath
>> 
>> On Fri, Sep 2, 2016 at 4:46 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I was wondering if anyone has tried to create Spark SQL tables on top of 
>> HBase tables so that data in HBase can be accessed using Spark Thriftserver 
>> with SQL statements? This is similar what can be done using Hive.
>> 
>> Thanks,
>> Ben
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
>> 
> 
> 



Re: Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Mich,

It sounds like that there would be no harm in changing then. Are you saying 
that using STS would still use MapReduce to run the SQL statements? What our 
users are doing in our CDH 5.7.2 installation is changing the execution engine 
to Spark when connected to HiveServer2 to get faster results. Would they still 
have to do this using STS? Lastly, we are seeing zombie YARN jobs left behind 
even after a user disconnects. Are you seeing this happen with STS? If not, 
then this would be even better.

Thanks for your fast reply.

Cheers,
Ben

> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> Hi,
> 
> Spark Thrift server (STS) still uses hive thrift server. If you look at 
> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
> 
> function usage {
>   echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
>   pattern="usage"
>   pattern+="\|Spark assembly has been built with Hive"
>   pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
>   pattern+="\|Spark Command: "
>   pattern+="\|==="
>   pattern+="\|--help"
> 
> 
> Indeed when you start STS, you pass hiveconf parameter to it
> 
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master  \
> --hiveconf hive.server2.thrift.port=10055 \
> 
> and STS bypasses Spark optimiser and uses Hive optimizer and execution 
> engine. You will see this in hive.log file
> 
> So I don't think it is going to give you much difference. Unless they have 
> recently changed the design of STS.
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 13 September 2016 at 22:32, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 
> 1.6.2 instead of HiveServer2? We are considering abandoning HiveServer2 for 
> it. Some advice and gotcha’s would be nice to know.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 1.6.2 
instead of HiveServer2? We are considering abandoning HiveServer2 for it. Some 
advice and gotcha’s would be nice to know.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Has anyone created tables using Spark SQL that directly connect to a JDBC data 
source such as PostgreSQL? I would like to use Spark SQL Thriftserver to access 
and query remote PostgreSQL tables. In this way, we can centralize data access 
to Spark SQL tables along with PostgreSQL making it very convenient for users. 
They would not know or care where the data is physically located anymore.

By the way, our users only know SQL.

If anyone has a better suggestion, then please let me know too.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Thank you for the idea. I will look for a PostgreSQL Serde for Hive. But, if 
you don’t mind me asking, how did you install the Oracle Serde?

Cheers,
Ben


> On Sep 13, 2016, at 7:12 PM, ayan guha <guha.a...@gmail.com> wrote:
> 
> One option is have Hive as the central point of exposing data ie create hive 
> tables which "point to" any other DB. i know Oracle provides there own Serde 
> for hive. Not sure about PG though.
> 
> Once tables are created in hive, STS will automatically see it. 
> 
> On Wed, Sep 14, 2016 at 11:08 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone created tables using Spark SQL that directly connect to a JDBC 
> data source such as PostgreSQL? I would like to use Spark SQL Thriftserver to 
> access and query remote PostgreSQL tables. In this way, we can centralize 
> data access to Spark SQL tables along with PostgreSQL making it very 
> convenient for users. They would not know or care where the data is 
> physically located anymore.
> 
> By the way, our users only know SQL.
> 
> If anyone has a better suggestion, then please let me know too.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Inserting New Primary Keys

2016-10-08 Thread Benjamin Kim
I have a table with data already in it that has primary keys generated by the 
function monotonicallyIncreasingId. Now, I want to insert more data into it 
with primary keys that will auto-increment from where the existing data left 
off. How would I do this? There is no argument I can pass into the function 
monotonicallyIncreasingId to seed it.

Thanks,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JDBC Very Slow

2016-09-16 Thread Benjamin Kim
I am testing this in spark-shell. I am following the Spark documentation by 
simply adding the PostgreSQL driver to the Spark Classpath.

SPARK_CLASSPATH=/path/to/postgresql/driver spark-shell

Then, I run the code below to connect to the PostgreSQL database to query. This 
is when I have problems.

Thanks,
Ben


> On Sep 16, 2016, at 3:29 PM, Nikolay Zhebet <phpap...@gmail.com> wrote:
> 
> Hi! Can you split init code with current comand? I thing it is main problem 
> in your code.
> 
> 16 сент. 2016 г. 8:26 PM пользователь "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> написал:
> Has anyone using Spark 1.6.2 encountered very slow responses from pulling 
> data from PostgreSQL using JDBC? I can get to the table and see the schema, 
> but when I do a show, it takes very long or keeps timing out.
> 
> The code is simple.
> 
> val jdbcDF = sqlContext.read.format("jdbc").options(
> Map("url" -> 
> "jdbc:postgresql://dbserver:port/database?user=user=password",
>"dbtable" -> “schema.table")).load()
> 
> jdbcDF.show
> 
> If anyone can help, please let me know.
> 
> Thanks,
> Ben
> 



JDBC Very Slow

2016-09-16 Thread Benjamin Kim
Has anyone using Spark 1.6.2 encountered very slow responses from pulling data 
from PostgreSQL using JDBC? I can get to the table and see the schema, but when 
I do a show, it takes very long or keeps timing out.

The code is simple.

val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> 
"jdbc:postgresql://dbserver:port/database?user=user=password",
   "dbtable" -> “schema.table")).load()

jdbcDF.show

If anyone can help, please let me know.

Thanks,
Ben



Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-09-07 Thread Benjamin Kim
We use Graphite/Grafana for custom metrics. We found Spark’s metrics not to be 
customizable. So, we write directly using Graphite’s API, which was very easy 
to do using Java’s socket library in Scala. It works great for us, and we are 
going one step further using Sensu to alert us if there is an anomaly in the 
metrics beyond the norm.

Hope this helps.

Cheers,
Ben


> On Sep 6, 2016, at 9:52 PM, map reduced  wrote:
> 
> Hi, anyone has any ideas please?
> 
> On Mon, Sep 5, 2016 at 8:30 PM, map reduced  > wrote:
> Hi,
> 
> I've written my custom metrics source/sink for my Spark streaming app and I 
> am trying to initialize it from metrics.properties - but that doesn't work 
> from executors. I don't have control on the machines in Spark cluster, so I 
> can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have it in 
> the fat jar where my app lives, but by the time my fat jar is downloaded on 
> worker nodes in cluster, executors are already started and their Metrics 
> system is already initialized - thus not picking my file with custom source 
> configuration in it.
> 
> Following this post 
> ,
>  I've specified 'spark.files 
>  = 
> metrics.properties' and 'spark.metrics.conf=metrics.properties' but by the 
> time 'metrics.properties' is shipped to executors, their metric system is 
> already initialized.
> 
> If I initialize my own metrics system, it's picking up my file but then I'm 
> missing master/executor level metrics/properties (eg. 
> executor.sink.mySink.propName=myProp - can't read 'propName' from 'mySink') 
> since they are initialized 
> 
>  by Spark's metric system.
> 
> Is there a (programmatic) way to have 'metrics.properties' shipped before 
> executors initialize 
> 
>  ?
> 
> Here's my SO question 
> .
> 
> Thanks,
> 
> KP
> 
> 



Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Benjamin Kim
I am trying to implement checkpointing in my streaming application but I am 
getting a not serializable error. Has anyone encountered this? I am deploying 
this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
//create and setup streaming context
def createContext(
batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: 
String, databaseName: String, tableName: String, partitionByColumnName: String
): StreamingContext = {

println("Creating new context")
val sparkConf = new SparkConf().setAppName("S3EventIngestion")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Create the streaming context with batch interval
val ssc = new StreamingContext(sc, Seconds(batchInterval))

// Create a text file stream on an S3 bucket
val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

csv.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
// process data
}
})

ssc.checkpoint(checkpointDirectory)
ssc
}

def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage: S3EventIngestion  

")
System.exit(1)
}

// Get streaming context from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpoint,
() => createContext(interval, checkpoint, bucket, database, table, 
partitionBy))

//start streaming context
context.start()
context.awaitTermination()
}
}

Can someone help please?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Loading data into Hbase table throws NoClassDefFoundError: org/apache/htrace/Trace error

2016-10-01 Thread Benjamin Kim
Mich,

I know up until CDH 5.4 we had to add the HTrace jar to the classpath to make 
it work using the command below. But after upgrading to CDH 5.7, it became 
unnecessary.

echo "/opt/cloudera/parcels/CDH/jars/htrace-core-3.2.0-incubating.jar" >> 
/etc/spark/conf/classpath.txt

Hope this helps.

Cheers,
Ben


> On Oct 1, 2016, at 3:22 PM, Mich Talebzadeh  wrote:
> 
> Trying bulk load using Hfiles in Spark as below example:
> 
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.mapreduce.Job
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
> import org.apache.hadoop.hbase.KeyValue
> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
> import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
> 
> So far no issues.
> 
> Then I do
> 
> val conf = HBaseConfiguration.create()
> conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, 
> core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, 
> yarn-site.xml, hbase-default.xml, hbase-site.xml
> val tableName = "testTable"
> tableName: String = testTable
> 
> But this one fails:
> 
> scala> val table = new HTable(conf, tableName)
> java.io.IOException: java.lang.reflect.InvocationTargetException
>   at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:431)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:424)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:302)
>   at org.apache.hadoop.hbase.client.HTable.(HTable.java:185)
>   at org.apache.hadoop.hbase.client.HTable.(HTable.java:151)
>   ... 52 elided
> Caused by: java.lang.reflect.InvocationTargetException: 
> java.lang.NoClassDefFoundError: org/apache/htrace/Trace
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
>   ... 57 more
> Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
>   at 
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:216)
>   at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:419)
>   at 
> org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
>   at 
> org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:105)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:905)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.(ConnectionManager.java:648)
>   ... 62 more
> Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace
> 
> I have got all the jar files in spark-defaults.conf
> 
> spark.driver.extraClassPath  
> /home/hduser/jars/ojdbc6.jar:/home/hduser/jars/jconn4.jar:/home/hduser/jars/hbase-client-1.2.3.jar:/home/hduser/jars/hbase-server-1.2.3.jar:/home/hduser/jars/hbase-common-1.2.3.jar:/home/hduser/jars/hbase-protocol-1.2.3.jar:/home/hduser/jars/htrace-core-3.0.4.jar:/home/hduser/jars/hive-hbase-handler-2.1.0.jar
> spark.executor.extraClassPath
> /home/hduser/jars/ojdbc6.jar:/home/hduser/jars/jconn4.jar:/home/hduser/jars/hbase-client-1.2.3.jar:/home/hduser/jars/hbase-server-1.2.3.jar:/home/hduser/jars/hbase-common-1.2.3.jar:/home/hduser/jars/hbase-protocol-1.2.3.jar:/home/hduser/jars/htrace-core-3.0.4.jar:/home/hduser/jars/hive-hbase-handler-2.1.0.jar
> 
> 
> and also in Spark shell where I test the code
> 
>  --jars 
> /home/hduser/jars/hbase-client-1.2.3.jar,/home/hduser/jars/hbase-server-1.2.3.jar,/home/hduser/jars/hbase-common-1.2.3.jar,/home/hduser/jars/hbase-protocol-1.2.3.jar,/home/hduser/jars/htrace-core-3.0.4.jar,/home/hduser/jars/hive-hbase-handler-2.1.0.jar'
> 
> So any ideas will be appreciated.
> 
> 

Re: Inserting New Primary Keys

2016-10-10 Thread Benjamin Kim
Jean,

I see your point. For the incremental data, which is very small, I should make 
sure that the PARTITION BY in the OVER(PARTITION BY ...) is left out so that 
all the data will be in one partition when assigned a row number. The query 
below should avoid any problems.

“SELECT ROW_NUMBER() OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN 
(SELECT COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”.

But initially, I’ll use the monotonicallyIncreasingId function when I first 
load the data.

Thanks,
Ben


> On Oct 10, 2016, at 8:36 AM, Jean Georges Perrin <j...@jgp.net> wrote:
> 
> Is there only one process adding rows? because this seems a little risky if 
> you have multiple threads doing that… 
> 
>> On Oct 8, 2016, at 1:43 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> Mich,
>> 
>> After much searching, I found and am trying to use “SELECT ROW_NUMBER() 
>> OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN (SELECT 
>> COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”. I think this should 
>> do it.
>> 
>> Thanks,
>> Ben
>> 
>> 
>>> On Oct 8, 2016, at 9:48 AM, Mich Talebzadeh <mich.talebza...@gmail.com 
>>> <mailto:mich.talebza...@gmail.com>> wrote:
>>> 
>>> can you get the max value from the current  table and start from MAX(ID) + 
>>> 1 assuming it is a numeric value (it should be)?
>>> 
>>> HTH
>>> 
>>> HTH
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>  
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> On 8 October 2016 at 17:42, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> I have a table with data already in it that has primary keys generated by 
>>> the function monotonicallyIncreasingId. Now, I want to insert more data 
>>> into it with primary keys that will auto-increment from where the existing 
>>> data left off. How would I do this? There is no argument I can pass into 
>>> the function monotonicallyIncreasingId to seed it.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>>> 
>> 
> 



Spark Streaming and Kinesis

2016-10-27 Thread Benjamin Kim
Has anyone worked with AWS Kinesis and retrieved data from it using Spark 
Streaming? I am having issues where it’s returning no data. I can connect to 
the Kinesis stream and describe using Spark. Is there something I’m missing? 
Are there specific IAM security settings needed? I just simply followed the 
Word Count ASL example. When it didn’t work, I even tried to run the code 
independently in Spark shell in yarn-client mode by hardcoding the arguments. 
Still, there was no data even with the setting InitialPositionInStream.LATEST 
changed to InitialPositionInStream.TRIM_HORIZON.

If anyone can help, I would truly appreciate it.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   >