Re: How to write data into Hive partitioned Parquet table?
Support for dynamic partitioning is available in master and will be part of Spark 1.2 On Thu, Oct 16, 2014 at 1:08 AM, Banias H banias4sp...@gmail.com wrote: I got tipped by an expert that the error of Unsupported language features in query that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition. My inefficient implementation is to: //1. run the query without DISTRIBUTE BY field1 SORT BY field2. JavaSchemaRDD rawRdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table); rawRdd.registerAsTempTable(temp); //2. Get a list of unique partition_field values JavaSchemaRDD partFieldsRdd = hiveCtx.sql(SELECT DISTINCT partition_field FROM temp); //3. Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile for (Row row : partFieldsRdd.toArray()) { String partitionVal = row.toString(0); hiveCtx.sql(SELECT * FROM temp WHERE partition_field=+partitionVal). saveAsParquetFile(partition_field=+partitionVal); } It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks. Regards, BH On Tue, Oct 14, 2014 at 8:44 PM, Banias H banias4sp...@gmail.com wrote: Hi, I am still new to Spark. Sorry if similar questions are asked here before. I am trying to read a Hive table; then run a query and save the result into a Hive partitioned Parquet table. For example, I was able to run the following in Hive: INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2 But when I tried running it in spark-sql, it gave me the following error: java.lang.RuntimeException: Unsupported language features in query: INSERT INTO TABLE ... I also tried the following Java code and I saw the same error: SparkConf sparkConf = new SparkConf().setAppName(Example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2); ... rdd.count(); //Just for running the query If I take out INSERT INTO TABLE target_table PARTITION (partition_field) from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly. Any help is much appreciated. Thanks. Regards, BH
Re: How to write data into Hive partitioned Parquet table?
I got tipped by an expert that the error of Unsupported language features in query that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition. My inefficient implementation is to: //1. run the query without DISTRIBUTE BY field1 SORT BY field2. JavaSchemaRDD rawRdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table); rawRdd.registerAsTempTable(temp); //2. Get a list of unique partition_field values JavaSchemaRDD partFieldsRdd = hiveCtx.sql(SELECT DISTINCT partition_field FROM temp); //3. Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile for (Row row : partFieldsRdd.toArray()) { String partitionVal = row.toString(0); hiveCtx.sql(SELECT * FROM temp WHERE partition_field=+partitionVal). saveAsParquetFile(partition_field=+partitionVal); } It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks. Regards, BH On Tue, Oct 14, 2014 at 8:44 PM, Banias H banias4sp...@gmail.com wrote: Hi, I am still new to Spark. Sorry if similar questions are asked here before. I am trying to read a Hive table; then run a query and save the result into a Hive partitioned Parquet table. For example, I was able to run the following in Hive: INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2 But when I tried running it in spark-sql, it gave me the following error: java.lang.RuntimeException: Unsupported language features in query: INSERT INTO TABLE ... I also tried the following Java code and I saw the same error: SparkConf sparkConf = new SparkConf().setAppName(Example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2); ... rdd.count(); //Just for running the query If I take out INSERT INTO TABLE target_table PARTITION (partition_field) from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly. Any help is much appreciated. Thanks. Regards, BH
How to write data into Hive partitioned Parquet table?
Hi, I am still new to Spark. Sorry if similar questions are asked here before. I am trying to read a Hive table; then run a query and save the result into a Hive partitioned Parquet table. For example, I was able to run the following in Hive: INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2 But when I tried running it in spark-sql, it gave me the following error: java.lang.RuntimeException: Unsupported language features in query: INSERT INTO TABLE ... I also tried the following Java code and I saw the same error: SparkConf sparkConf = new SparkConf().setAppName(Example); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql(INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2); ... rdd.count(); //Just for running the query If I take out INSERT INTO TABLE target_table PARTITION (partition_field) from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly. Any help is much appreciated. Thanks. Regards, BH