Saving Structured Streaming DF to Hive Partitioned table

2017-02-26 Thread nimrodo
Hi,

I want to load a stream of CSV files to a partitioned Hive table called
myTable.

I tried using Spark 2 Structured Streaming to do that:
val spark = SparkSession
  .builder
  .appName("TrueCallLoade")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition.mode", "non-strict")
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.max.dynamic.partition", "2048")
  .config("hive.exec.max.dynamic.partition.pernode", "256")
  .getOrCreate()
val df = spark.readStream.option("sep", ",").option("header",
"true").schema(customSchema).csv(fileDirectory)

The dataframe has 2 columns called "dt" and "h" by which the Hive table is
partitioned.

writeStream can't directly stream to a Hive table, so I decided to use
val query =
df.writeStream.queryName("LoadedCSVData").outputMode("Append").format("memory").start()

and then
spark.sql("INSERT INTO myTable SELECT * FROM LoadedCSVData")

This doesn't seem to insert work. Any idea how I can achieve that?

Nimrod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Structured-Streaming-DF-to-Hive-Partitioned-table-tp28424.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CSV DStream to Hive

2017-02-21 Thread nimrodo
Hi all,

I have a DStream that contains very long comma separated values. I want to
convert this DStream to a DataFrame. I thought of using split on the RDD and
toDF however I can't get it to work.

Can anyone help me here?

Nimrod





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-DStream-to-Hive-tp28410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions writing to Hive

2017-02-14 Thread nimrodo
Hi,

I'm trying to write a DataFrame to a Hive partitioned table. This works fine
from spark-shell, however when I use spark-submit i get the following
exception:

Exception in thread "main" java.lang.NoSuchMethodException:
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path,
java.lang.String, java.util.Map, boolean, int, boolean, boolean, boolean)
at java.lang.Class.getMethod(Class.java:1665)
at
org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod$lzycompute(HiveShim.scala:404)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitionsMethod(HiveShim.scala:403)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitions(HiveShim.scala:455)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:562)
at
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:281)
at
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:228)
at
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:227)
at
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:270)
at
org.apache.spark.sql.hive.client.ClientWrapper.loadDynamicPartitions(ClientWrapper.scala:561)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:225)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at
org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
at com.pelephone.TrueCallLoader$.main(TrueCallLoader.scala:175)
at com.pelephone.TrueCallLoader.main(TrueCallLoader.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Can you help me finding the problem?

Nimrod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodException-org-apache-hadoop-hive-ql-metadata-Hive-loadDynamicPartitions-writing-to-Hive-tp28388.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Problem saving Hive table with Overwrite mode

2016-07-13 Thread nimrodo
Hi,

I'm trying to write a partitioned parquet table and save it as a hive table
at a specific path. 
The code I'm using is in Java (columns and table names are a bit different
in my real code) and the code is executed using AirFlow which calls the
spark-submit:

aggregatedData.write().format("parquet").mode(SaveMode.Overwrite).partitionBy("schema_partition",
"colC").option("path","hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet").saveAsTable("table_info");

However I'm getting the following exception:
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - Exception in thread
"main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO -
TungstenAggregate(key=[colA#43,colB#44,colC#46],
functions=[(min(colD#37L),mode=Final,isDistinct=false),(max(colE#42L),mode=Final,isDistinct=false),(max(colF#41L),mode=Final,isDistinct=false),(max(colG#38L),mode=Final,isDistinct=false),(max(colH#39L),mode=Final,isDistinct=false),(max(colI#40L),mode=Final,isDistinct=false)],
output=[colA#43,colB#44,colD#51L,colE#52L,colC#46,colF#53L,colG#54L,colH#55L,colI#56L])
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - +- TungstenExchange
hashpartitioning(colA#43,colB#44,colC#46,200), None
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +-
TungstenAggregate(key=[colA#43,colB#44,colC#46],
functions=[(min(colD#37L),mode=Partial,isDistinct=false),(max(colE#42L),mode=Partial,isDistinct=false),(max(colF#41L),mode=Partial,isDistinct=false),(max(colG#38L),mode=Partial,isDistinct=false),(max(colH#39L),mode=Partial,isDistinct=false),(max(colI#40L),mode=Partial,isDistinct=false)],
output=[colA#43,colB#44,colC#46,min#73L,max#74L,max#75L,max#76L,max#77L,max#78L])
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +- Scan
ParquetRelation[colE#42L,colA#43,colG#38L,colD#37L,colH#39L,colF#41L,colI#40L,colC#46,colB#44]
InputPaths: hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO -
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:109)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at