Hi,

Following your answer I  was able to make it work.
FIY:
Basically the solution is to manually create the table in hive using a sql 
“Create table” command.
When doing  a saveAsTable, hive meta-store don’t get the info of the df.
So now my flow is :

  *   Create a dataframe
  *   if it is the first time I see the table, I generate a CREATE TABLE using 
the DF.schema.fields.
  *   If it is not:
     *   I do a diff of my df schema and myTable schema
     *   I do a sql "Alter table add columns” for the table
     *   Use a df.withColumn for each column that are missing in the df
  *   Then I use df.insertInto myTable

I also migrated for parquet to ORC, not sure if this have an impact or not.

Thanks you for our help.

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 11:54 PM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

This should work. Make sure that you use HiveContext.sql and sqlContext 
correctly

This is an example in Spark, reading a CSV file, doing some manipulation, 
creating a temp table, saving data as ORC file, adding another column and 
inserting values to table in Hive with default values for new rows

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
  val conf = new SparkConf().
               setAppName("ImportCSV").
               setMaster("local[12]").
               set("spark.driver.allowMultipleContexts", "true").
               set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  //
  // Get a DF first based on Databricks CSV libraries
  //
  val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
  //
  // Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields
  // Example csv cell £2,500.00 --> need to transform to plain 2500.00
  //
  val a = df.
          filter(col("Total") > "").
          map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))
   //
   // convert this RDD to DF and create a Spark temporary table
   //
   a.toDF.registerTempTable("tmp")
  //
  // Need to create and populate target ORC table t3 in database test in Hive
  //
  HiveContext.sql("use test")
  HiveContext.sql("DROP TABLE IF EXISTS test.t3")
  var sqltext : String = ""
  sqltext = """
  CREATE TABLE test.t3 (
   INVOICENUMBER          String
  ,PAYMENTDATE            String
  ,NET                    DOUBLE
  ,VAT                    DOUBLE
  ,TOTAL                  DOUBLE
  )
  COMMENT 'from csv file from excel sheet'
  STORED AS ORC
  TBLPROPERTIES ( "orc.compress"="ZLIB" )
  """
  HiveContext.sql(sqltext)
  // Note you can only see Spark temporary table in sqlContext NOT HiveContext
  val results = sqlContext.sql("SELECT * FROM tmp")
  // clean up the file in HDFS directory first if exists
  val hadoopConf = new org.apache.hadoop.conf.Configuration()
  val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
  val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   // The 
path for Hive table just created
  try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case 
_ : Throwable => { } }

  results.write.format("orc").save(output)
//
  sqlContext.sql("ALTER TABLE test.t3 ADD COLUMNS (new_col VARCHAR(30))")
  sqlContext.sql("INSERT INTO test.t3 SELECT *, 'London' FROM tmp")
  HiveContext.sql("SELECT * FROM test.t3 ORDER BY 1").collect.foreach(println)

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 11 April 2016 at 01:36, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Your solution works in hive, but not in spark, even if I use hive context.
I tried to create a temp table and then this query:
 - sqlContext.sql("insert into table myTable select * from myTable_temp”)
But I still get the same error.

thanks

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 12:25 PM
To: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>

Subject: Re: alter table add columns aternatives or hive refresh

Hi,

I am confining myself to Hive tables. As I stated it before I have not tried it 
in Spark. So I stand corrected.

Let us try this simple test in Hive


-- Create table
hive> create table testme(col1 int);
OK
--insert a row
hive> insert into testme values(1);

Loading data to table test.testme
OK
-- Add a new column to testme
hive> alter table testme add columns (new_col varchar(30));
OK
Time taken: 0.055 seconds

-- Expect one row here

hive> select * from testme;
OK
1       NULL
-- Add a new row including values for new_col. This should work
hive> insert into testme values(1,'London');
Loading data to table test.testme
OK
hive> select * from testme;
OK
1       NULL
1       London
Time taken: 0.074 seconds, Fetched: 2 row(s)
-- Now update the new column
hive> update testme set col2 = 'NY';
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on 
table test.testme that does not use an AcidOutputFormat or is not bucketed

So this is Hive. You can add new rows including values for the new column but 
cannot update the null values. Will this work for you?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 10 April 2016 at 19:34, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
So basically you are telling me that I need to recreate a table, and re-insert 
everything every time  I update a column?
I understand the constraints, but that solution doesn’t look good to me. I am 
updating the schema everyday and the table is a couple of TB of data.

Do you see any other options that will allow me not to move TB of data everyday?

Thanks for you answer

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 3:41 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

I have not tried it on Spark but the column added in Hive to an existing table 
cannot be updated for existing rows. In other words the new column is set to 
null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.    The metadata for the underlying table will be updated
2.    The new column will by default have null value
3.    The existing rows cannot have new column updated to a non null value
4.    New rows can have non null values set for the new column
5.    No sql operation can be done on that column. For example select * from 
<TABLE> where new_column IS NOT NULL
6.    The easiest option is to create a new table with the new column and do 
insert/select from the existing table with values set for the new column

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 10 April 2016 at 05:06, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields    <— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks




Reply via email to