Saving Structured Streaming DF to Hive Partitioned table
Hi, I want to load a stream of CSV files to a partitioned Hive table called myTable. I tried using Spark 2 Structured Streaming to do that: val spark = SparkSession .builder .appName("TrueCallLoade") .enableHiveSupport() .config("hive.exec.dynamic.partition.mode", "non-strict") .config("hive.exec.dynamic.partition", "true") .config("hive.exec.max.dynamic.partition", "2048") .config("hive.exec.max.dynamic.partition.pernode", "256") .getOrCreate() val df = spark.readStream.option("sep", ",").option("header", "true").schema(customSchema).csv(fileDirectory) The dataframe has 2 columns called "dt" and "h" by which the Hive table is partitioned. writeStream can't directly stream to a Hive table, so I decided to use val query = df.writeStream.queryName("LoadedCSVData").outputMode("Append").format("memory").start() and then spark.sql("INSERT INTO myTable SELECT * FROM LoadedCSVData") This doesn't seem to insert work. Any idea how I can achieve that? Nimrod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Structured-Streaming-DF-to-Hive-Partitioned-table-tp28424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
CSV DStream to Hive
Hi all, I have a DStream that contains very long comma separated values. I want to convert this DStream to a DataFrame. I thought of using split on the RDD and toDF however I can't get it to work. Can anyone help me here? Nimrod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CSV-DStream-to-Hive-tp28410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions writing to Hive
Hi, I'm trying to write a DataFrame to a Hive partitioned table. This works fine from spark-shell, however when I use spark-submit i get the following exception: Exception in thread "main" java.lang.NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path, java.lang.String, java.util.Map, boolean, int, boolean, boolean, boolean) at java.lang.Class.getMethod(Class.java:1665) at org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114) at org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod$lzycompute(HiveShim.scala:404) at org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod(HiveShim.scala:403) at org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitions(HiveShim.scala:455) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(ClientWrapper.scala:562) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:281) at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:228) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:227) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:270) at org.apache.spark.sql.hive.client.ClientWrapper.loadDynamicPartitions(ClientWrapper.scala:561) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:225) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221) at com.pelephone.TrueCallLoader$.main(TrueCallLoader.scala:175) at com.pelephone.TrueCallLoader.main(TrueCallLoader.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Can you help me finding the problem? Nimrod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodException-org-apache-hadoop-hive-ql-metadata-Hive-loadDynamicPartitions-writing-to-Hive-tp28388.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Problem saving Hive table with Overwrite mode
Hi, I'm trying to write a partitioned parquet table and save it as a hive table at a specific path. The code I'm using is in Java (columns and table names are a bit different in my real code) and the code is executed using AirFlow which calls the spark-submit: aggregatedData.write().format("parquet").mode(SaveMode.Overwrite).partitionBy("schema_partition", "colC").option("path","hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet").saveAsTable("table_info"); However I'm getting the following exception: [2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: [2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - TungstenAggregate(key=[colA#43,colB#44,colC#46], functions=[(min(colD#37L),mode=Final,isDistinct=false),(max(colE#42L),mode=Final,isDistinct=false),(max(colF#41L),mode=Final,isDistinct=false),(max(colG#38L),mode=Final,isDistinct=false),(max(colH#39L),mode=Final,isDistinct=false),(max(colI#40L),mode=Final,isDistinct=false)], output=[colA#43,colB#44,colD#51L,colE#52L,colC#46,colF#53L,colG#54L,colH#55L,colI#56L]) [2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - +- TungstenExchange hashpartitioning(colA#43,colB#44,colC#46,200), None [2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +- TungstenAggregate(key=[colA#43,colB#44,colC#46], functions=[(min(colD#37L),mode=Partial,isDistinct=false),(max(colE#42L),mode=Partial,isDistinct=false),(max(colF#41L),mode=Partial,isDistinct=false),(max(colG#38L),mode=Partial,isDistinct=false),(max(colH#39L),mode=Partial,isDistinct=false),(max(colI#40L),mode=Partial,isDistinct=false)], output=[colA#43,colB#44,colC#46,min#73L,max#74L,max#75L,max#76L,max#77L,max#78L]) [2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +- Scan ParquetRelation[colE#42L,colA#43,colG#38L,colD#37L,colH#39L,colF#41L,colI#40L,colC#46,colB#44] InputPaths: hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet [2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - [2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) [2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80) [2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) [2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) [2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) [2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:109) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) [2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) [2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) [2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) [2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at