I got tipped by an expert that the error of "Unsupported language features
in query" that I had was due to the fact that SparkSQL does not support
dynamic partitions, and I can do saveAsParquetFile() for each partition.

My inefficient implementation is to:

//1. run the query without  DISTRIBUTE BY field1 SORT BY field2.
JavaSchemaRDD rawRdd = hiveCtx.sql("INSERT INTO TABLE target_table
PARTITION (partition_field) select field1, field2, partition_field FROM
source_table");
rawRdd.registerAsTempTable("temp");

//2. Get a list of unique partition_field values
JavaSchemaRDD partFieldsRdd = hiveCtx.sql("SELECT DISTINCT partition_field
FROM temp");

//3. Iterate each partition_field value. Run a query to get JavaSchemaRDD.
Then save the result as ParquetFile
for (Row row : partFieldsRdd.toArray()) {
    String partitionVal = row.toString(0);
    hiveCtx.sql("SELECT * FROM temp WHERE partition_field="+partitionVal).
saveAsParquetFile("partition_field="+partitionVal);
}

It ran and produced the desired output. However Hive runs orders of
magnitude faster than the code above. Anyone who can shed some lights on a
more efficient implementation is much appreciated.  Many thanks.

Regards,
BH

On Tue, Oct 14, 2014 at 8:44 PM, Banias H <banias4sp...@gmail.com> wrote:

> Hi,
>
> I am still new to Spark. Sorry if similar questions are asked here before.
> I am trying to read a Hive table; then run a query and save the result into
> a Hive partitioned Parquet table.
>
> For example, I was able to run the following in Hive:
> INSERT INTO TABLE target_table PARTITION (partition_field) select field1,
> field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY
> field2
>
> But when I tried running it in spark-sql, it gave me the following error:
>
> java.lang.RuntimeException:
> Unsupported language features in query: INSERT INTO TABLE ...
>
> I also tried the following Java code and I saw the same error:
>
> SparkConf sparkConf = new SparkConf().setAppName("Example");
> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
> JavaHiveContext hiveCtx = new JavaHiveContext(ctx);
> JavaSchemaRDD rdd = hiveCtx.sql("INSERT INTO TABLE target_table PARTITION
> (partition_field) select field1, field2, partition_field FROM source_table
> DISTRIBUTE BY field1 SORT BY field2");
> ...
> rdd.count(); //Just for running the query
>
> If I take out "INSERT INTO TABLE target_table PARTITION (partition_field)"
> from the sql statement and run that in hiveCtx.sql(), I got a RDD but I
> only seem to do rdd.saveAsParquetFile(target_table_location). But that is
> not partitioned correctly.
>
> Any help is much appreciated. Thanks.
>
> Regards,
> BH
>

Reply via email to