RE: RE: error while creating HiveContext
Hi Sun, I could connect to Hive in spark command line and run sql commands. So I don’t think it is the problem with hive config file. Regards, Anand.C From: fightf...@163.com [mailto:fightf...@163.com] Sent: Friday, November 27, 2015 3:25 PM To: Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com>; user <user@spark.apache.org> Subject: Re: RE: error while creating HiveContext Could you provide your hive-site.xml file info ? Best, Sun. fightf...@163.com<mailto:fightf...@163.com> From: Chandra Mohan, Ananda Vel Murugan<mailto:ananda.muru...@honeywell.com> Date: 2015-11-27 17:04 To: fightf...@163.com<mailto:fightf...@163.com>; user<mailto:user@spark.apache.org> Subject: RE: error while creating HiveContext Hi, I verified and I could see hive-site.xml in spark conf directory. Regards, Anand.C From: fightf...@163.com<mailto:fightf...@163.com> [mailto:fightf...@163.com] Sent: Friday, November 27, 2015 12:53 PM To: Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>>; user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: error while creating HiveContext Hi, I think you just want to put the hive-site.xml in the spark/conf directory and it would load it into spark classpath. Best, Sun. ________ fightf...@163.com<mailto:fightf...@163.com> From: Chandra Mohan, Ananda Vel Murugan<mailto:ananda.muru...@honeywell.com> Date: 2015-11-27 15:04 To: user<mailto:user@spark.apache.org> Subject: error while creating HiveContext Hi, I am building a spark-sql application in Java. I created a maven project in Eclipse and added all dependencies including spark-core and spark-sql. I am creating HiveContext in my spark program and then try to run sql queries against my Hive Table. When I submit this job in spark, for some reasons it is trying to create derby metastore. But my hive-site.xml clearly specifies the jdbc url of my MySQL . So I think my hive-site.xml is not getting picked by spark program. I specified hive-site.xml path using “—files” argument in spark-submit. I also tried placing hive-site.xml file in my jar . I even tried creating Configuration object with hive-site.xml path and updated my HiveContext by calling addResource() method. I want to know where I should put hive config files in my jar or in my eclipse project or in my cluster for it to be picked by correctly in my spark program. Thanks for any help. Regards, Anand.C
RE: error while creating HiveContext
Hi, I verified and I could see hive-site.xml in spark conf directory. Regards, Anand.C From: fightf...@163.com [mailto:fightf...@163.com] Sent: Friday, November 27, 2015 12:53 PM To: Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com>; user <user@spark.apache.org> Subject: Re: error while creating HiveContext Hi, I think you just want to put the hive-site.xml in the spark/conf directory and it would load it into spark classpath. Best, Sun. fightf...@163.com<mailto:fightf...@163.com> From: Chandra Mohan, Ananda Vel Murugan<mailto:ananda.muru...@honeywell.com> Date: 2015-11-27 15:04 To: user<mailto:user@spark.apache.org> Subject: error while creating HiveContext Hi, I am building a spark-sql application in Java. I created a maven project in Eclipse and added all dependencies including spark-core and spark-sql. I am creating HiveContext in my spark program and then try to run sql queries against my Hive Table. When I submit this job in spark, for some reasons it is trying to create derby metastore. But my hive-site.xml clearly specifies the jdbc url of my MySQL . So I think my hive-site.xml is not getting picked by spark program. I specified hive-site.xml path using “—files” argument in spark-submit. I also tried placing hive-site.xml file in my jar . I even tried creating Configuration object with hive-site.xml path and updated my HiveContext by calling addResource() method. I want to know where I should put hive config files in my jar or in my eclipse project or in my cluster for it to be picked by correctly in my spark program. Thanks for any help. Regards, Anand.C
error while creating HiveContext
Hi, I am building a spark-sql application in Java. I created a maven project in Eclipse and added all dependencies including spark-core and spark-sql. I am creating HiveContext in my spark program and then try to run sql queries against my Hive Table. When I submit this job in spark, for some reasons it is trying to create derby metastore. But my hive-site.xml clearly specifies the jdbc url of my MySQL . So I think my hive-site.xml is not getting picked by spark program. I specified hive-site.xml path using "-files" argument in spark-submit. I also tried placing hive-site.xml file in my jar . I even tried creating Configuration object with hive-site.xml path and updated my HiveContext by calling addResource() method. I want to know where I should put hive config files in my jar or in my eclipse project or in my cluster for it to be picked by correctly in my spark program. Thanks for any help. Regards, Anand.C
Count of streams processed
HI, Is it possible to have a running count of number of kafka messages processed in a spark streaming application? Thanks Regards, Anand.C
Partitioned Parquet based external table
Hi, I am using Spark 1.5.1. https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java. I have slightly modified this example to create partitioned parquet file Instead of this line schemaPeople.write().parquet("people.parquet"); I use this line schemaPeople.write().partitionBy("country").parquet("/user/Ananda/people.parquet"); I have also updated the Person class and added country attribute. I have also updated my input file accordingly. When I run this code in spark, it seems to work. I could see partitioned folder and parquet file inside it in HDFS where I store this parquet file. But when I create a external table in Hive, it does not work. When I do "select * from person5", it returns no rows. This is how I create the table CREATE EXTERNAL TABLE person5(name string, age int,city string) PARTITIONED BY (country string) STORED AS PARQUET LOCATION '/user/ananda/people.parquet/'; When I create a non partitioned table, it works fine. Please help if you have any idea. Regards, Anand.C
RE: Partitioned Parquet based external table
My primary interface to access the data is going to be Hive. I am planning to use spark to ingest data (in future I will use spark streaming, but for now it is just spark sql). Another group will analyze this data using Hive queries. For this scenario, earlier suggestion seems to work. Regards, Anand.C From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, November 13, 2015 2:25 AM To: Chandra Mohan, Ananda Vel Murugan Cc: Michal Klos; user Subject: Re: Partitioned Parquet based external table Note that if you read in the table using sqlContext.read.parquet(...) or if you use saveAsTable(...) the partitions will be auto-discovered. However, this is not compatible with Hive if you also want to be able to read the data there. On Thu, Nov 12, 2015 at 6:23 AM, Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote: Thank you. It works perfectly fine. I enabled dynamic partition in my table and then fired “msck repair table your_table” and it works now Regards, Anand.C From: Michal Klos [mailto:michal.klo...@gmail.com<mailto:michal.klo...@gmail.com>] Sent: Thursday, November 12, 2015 6:32 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Partitioned Parquet based external table You must add the partitions to the Hive table with something like "alter table your_table add if not exists partition (country='us');". If you have dynamic partitioning turned on, you can do 'msck repair table your_table' to recover the partitions. I would recommend reviewing the Hive documentation on partitions M On Nov 12, 2015, at 6:38 AM, Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote: Hi, I am using Spark 1.5.1. https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java. I have slightly modified this example to create partitioned parquet file Instead of this line schemaPeople.write().parquet("people.parquet"); I use this line schemaPeople.write().partitionBy("country").parquet("/user/Ananda/people.parquet"); I have also updated the Person class and added country attribute. I have also updated my input file accordingly. When I run this code in spark, it seems to work. I could see partitioned folder and parquet file inside it in HDFS where I store this parquet file. But when I create a external table in Hive, it does not work. When I do “select * from person5”, it returns no rows. This is how I create the table CREATE EXTERNAL TABLE person5(name string, age int,city string) PARTITIONED BY (country string) STORED AS PARQUET LOCATION '/user/ananda/people.parquet/'; When I create a non partitioned table, it works fine. Please help if you have any idea. Regards, Anand.C
RE: Partitioned Parquet based external table
Thank you. It works perfectly fine. I enabled dynamic partition in my table and then fired “msck repair table your_table” and it works now Regards, Anand.C From: Michal Klos [mailto:michal.klo...@gmail.com] Sent: Thursday, November 12, 2015 6:32 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Partitioned Parquet based external table You must add the partitions to the Hive table with something like "alter table your_table add if not exists partition (country='us');". If you have dynamic partitioning turned on, you can do 'msck repair table your_table' to recover the partitions. I would recommend reviewing the Hive documentation on partitions M On Nov 12, 2015, at 6:38 AM, Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote: Hi, I am using Spark 1.5.1. https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java. I have slightly modified this example to create partitioned parquet file Instead of this line schemaPeople.write().parquet("people.parquet"); I use this line schemaPeople.write().partitionBy("country").parquet("/user/Ananda/people.parquet"); I have also updated the Person class and added country attribute. I have also updated my input file accordingly. When I run this code in spark, it seems to work. I could see partitioned folder and parquet file inside it in HDFS where I store this parquet file. But when I create a external table in Hive, it does not work. When I do “select * from person5”, it returns no rows. This is how I create the table CREATE EXTERNAL TABLE person5(name string, age int,city string) PARTITIONED BY (country string) STORED AS PARQUET LOCATION '/user/ananda/people.parquet/'; When I create a non partitioned table, it works fine. Please help if you have any idea. Regards, Anand.C
RE: Get the previous state string in Spark streaming
Hi, Thanks for the response. We are trying to implement something similar as discussed in the following SFO post. http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation We are doing it in java while accepted answer(second answer) in this post is in Scala. We wrote our java code taking this scala code as reference. But we are getting exception in highlighted line i.e. in return Optional.of(events.add(state.toString());); Specifically, it happens when we call events.add() final Function2<List, Optional<List>, Optional<List>> updateFunc = new Function2<List, Optional<List>, Optional<List>>() { public Optional<List> call(List events, Optional<List> state) throws Exception { // TODO Auto-generated method stub if(state.toString()==null) return Optional.of(events); else { //UnsupportedOperationException here return Optional.of(events.add(state.toString());); } } }; Please let us know if you need more details. Unfortunately we are not in a position to share whole code. Thanks Regards, Anand/Yogesh From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, October 16, 2015 1:22 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: Get the previous state string in Spark streaming Its hard to help without any stacktrace associated with UnsupportedOperationException. On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote: One of my co-worker(Yogesh) was trying to get this posted in spark mailing and it seems it did not get posted. So I am reposting it here. Please help. Hi, I am new to Spark and was trying to do some experiments with it. I had a JavaPairDStream<String, List> RDD. I want to get the list of string from its previous state. For that I use updateStateByKey function as follows: final Function2<List, Optional<List>, Optional<List>> updateFunc = new Function2<List, Optional<List>, Optional<List>>() { public Optional<List> call(List arg0, Optional<List> arg1) throws Exception { // TODO Auto-generated method stub if(arg1.toString()==null) return Optional.of(arg0); else { arg0.add(arg1.toString()); return Optional.of(arg0); } } }; I want the function to append the new list of string to the previous list and return the new list. But I am not able to do so. I am getting the C error. Can anyone which help me out in getting the desired output?
Get the previous state string in Spark streaming
One of my co-worker(Yogesh) was trying to get this posted in spark mailing and it seems it did not get posted. So I am reposting it here. Please help. Hi, I am new to Spark and was trying to do some experiments with it. I had a JavaPairDStreamRDD. I want to get the list of string from its previous state. For that I use updateStateByKey function as follows: final Function2 , Optional
> updateFunc = new Function2
, Optional
>() { public Optional
call(List arg0, Optional
arg1) throws Exception { // TODO Auto-generated method stub if(arg1.toString()==null) return Optional.of(arg0); else { arg0.add(arg1.toString()); return Optional.of(arg0); } } }; I want the function to append the new list of string to the previous list and return the new list. But I am not able to do so. I am getting the " java.lang.UnsupportedOperationException" error. Can anyone which help me out in getting the desired output?
spark streaming filestream API
Hi All, I have a directory hdfs which I want to monitor and whenever there is a new file in it, I want to parse that file and load the contents into a HIVE table. File format is proprietary and I have java parsers for parsing it. I am building a spark streaming application for this workflow. For doing this, I found JavaStreamingContext.filestream API. It takes four arguments directory path, key class, value class and inputformat. What should be values of key and value class? Please suggest. Thank you. Regards, Anand.C
RE: spark streaming filestream API
Hi, Thanks for your response. My input format is the one I have created to handle the files as a whole i.e. WholeFileInputFormat I wrote one based on this example https://code.google.com/p/hadoop-course/source/browse/HadoopSamples/src/main/java/mr/wholeFile/WholeFileInputFormat.java?r=3 In this case, key would be Nullwritable and value would be BytesWritable right? Unfortunately my files are binary and not text files. Regards, Anand.C From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, October 14, 2015 5:31 PM To: Chandra Mohan, Ananda Vel Murugan Cc: user Subject: Re: spark streaming filestream API Key and Value are the ones that you are using with your InputFormat. Eg: JavaReceiverInputDStream lines = jssc.fileStream("/sigmoid", LongWritable.class, Text.class, TextInputFormat.class); TextInputFormat uses the LongWritable as Key and Text as Value classes. If your data is plain CSV or text data then you can use the jssc.textFileStream("/sigmoid") without worrying about the InputFormat, Key and Value classes. Thanks Best Regards On Wed, Oct 14, 2015 at 5:12 PM, Chandra Mohan, Ananda Vel Murugan <ananda.muru...@honeywell.com<mailto:ananda.muru...@honeywell.com>> wrote: Hi All, I have a directory hdfs which I want to monitor and whenever there is a new file in it, I want to parse that file and load the contents into a HIVE table. File format is proprietary and I have java parsers for parsing it. I am building a spark streaming application for this workflow. For doing this, I found JavaStreamingContext.filestream API. It takes four arguments directory path, key class, value class and inputformat. What should be values of key and value class? Please suggest. Thank you. Regards, Anand.C
RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, Thanks for the response. I was looking for a java solution. I will check the scala and python ones. Regards, Anand.C From: Todd Nist [mailto:tsind...@gmail.com] Sent: Tuesday, May 19, 2015 6:17 PM To: Chandra Mohan, Ananda Vel Murugan Cc: ayan guha; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row I believe your looking for df.na.fill in scala, in pySpark Module it is fillna (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html) from the docs: df4.fillna({'age': 50, 'name': 'unknown'}).show() age height name 10 80 Alice 5 null Bob 50 null Tom 50 null unknown On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, Thanks for the response. But I could not see fillna function in DataFrame class. [cid:image001.png@01D092DA.4DF87A00] Is it available in some specific version of Spark sql. This is what I have in my pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency Regards, Anand.C From: ayan guha [mailto:guha.a...@gmail.commailto:guha.a...@gmail.com] Sent: Monday, May 18, 2015 5:19 PM To: Chandra Mohan, Ananda Vel Murugan; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, Thanks for the response. But I could not see fillna function in DataFrame class. [cid:image001.png@01D0920E.32B14460] Is it available in some specific version of Spark sql. This is what I have in my pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency Regards, Anand.C From: ayan guha [mailto:guha.a...@gmail.com] Sent: Monday, May 18, 2015 5:19 PM To: Chandra Mohan, Ananda Vel Murugan; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C