In my example directories were distinct.

So If I would like to have to distinct directories ex.

/tmp/data/year=2012
/tmp/data/year=2013

It does not work with

val df = Seq((2012, "Batman")).toDF("year","title")

df.write.partitionBy("year").avro("/tmp/data")

val df2 = Seq((2013, "Batman")).toDF("year","title")

df2.write.partitionBy("year").avro("/tmp/data")


As you can see, it complains about the target directory (/tmp/data) and not 
about the partitioning keys.


org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)



On 22 Dec 2015, at 15:44, Yash Sharma 
<yash...@gmail.com<mailto:yash...@gmail.com>> wrote:


Well this will indeed hit the error if the next run has similar year and months 
and writing would not be possible.

You can try working around by introducing a runCount in partition or in the 
output path.

Something like-

/tmp/data/year/month/01
/tmp/data/year/month/02

Or,
/tmp/data/01/year/month
/tmp/data/02/year/month

This is a work around.

Am sure other better approaches would follow.

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 7:01 PM, "Jan Holmberg" 
<jan.holmb...@perigeum.fi<mailto:jan.holmb...@perigeum.fi>> wrote:
Hi Yash,

the error is caused by the fact that first run creates the base directory ie. 
"/tmp/data" and the second batch stumbles to the existing base directory. I 
understand that the existing base directory is a challenge but I do not 
understand how to make this work with streaming example where each batch would 
create a new distinct directory.

Granularity has no impact. No matter how data is partitioned, second 'batch' 
always fails with existing base dir.

scala> df2.write.partitionBy("year").avro("/tmp/data")
org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)


On 22 Dec 2015, at 14:06, Yash Sharma 
<yash...@gmail.com<mailto:yash...@gmail.com>> wrote:


Hi Jan,
Is the error because a past run of the job has already written to the location?

In that case you can add more granularity with 'time' along with year and 
month. That should give you a distinct path for every run.

Let us know if it helps or if i missed anything.

Goodluck

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 2:31 PM, "Jan Holmberg" 
<jan.holmb...@perigeum.fi<mailto:jan.holmb...@perigeum.fi>> wrote:
Hi,
I'm stuck with writing partitioned data to hdfs. Example below ends up with 
'already exists' -error.

I'm wondering how to handle streaming use case.

What is the intended way to write streaming data to hdfs? What am I missing?

cheers,
-jan


import com.databricks.spark.avro._

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/data")

val df2 = Seq(
(2012, 10, "Batman", 9.8),
(2012, 10, "Hero", 8.7),
(2012, 9, "Robot", 5.5),
(2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")

df2.write.partitionBy("year", "month").avro("/tmp/data")
---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



Reply via email to