Thanks Harsh for the reply. When I change the code to something like this -
def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir: String) = { fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" + ScalaUtil.currentDateTimeString)) fileSystem.create(new Path(bakDir + latest)) df.write.format("com.databricks.spark.csv").mode("Overwrite").options(Map("mode" -> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir + latest) } It still shows the same error. Also, if I don't do the unionAll operation, the operation is successful with the previous code. Anything else that I should be checking? On Wed, Aug 17, 2016 at 1:32 PM, HARSH TAKKAR <takkarha...@gmail.com> wrote: > Hi > > I can see that exception is caused by following, csn you check where in > your code you are using this path > > Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path > does not exist: hdfs://testcluster:8020/experiments/vol/spark_chomp_ > data/bak/restaurants-bak/latest > > On Wed, 17 Aug 2016, 10:57 p.m. max square, <max2subscr...@gmail.com> > wrote: > >> /bump >> >> It'd be great if someone can point me to the correct direction. >> >> On Mon, Aug 8, 2016 at 5:07 PM, max square <max2subscr...@gmail.com> >> wrote: >> >>> Here's the complete stacktrace - https://gist.github.com/rohann/ >>> 649b0fcc9d5062ef792eddebf5a315c1 >>> >>> For reference, here's the complete function - >>> >>> def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir: >>> String) = { >>> >>> fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" >>> + ScalaUtil.currentDateTimeString)) >>> >>> df.write.format("com.databricks.spark.csv").options(Map("mode" -> >>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir >>> + latest) >>> >>> } >>> >>> On Mon, Aug 8, 2016 at 3:41 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Mind showing the complete stack trace ? >>>> >>>> Thanks >>>> >>>> On Mon, Aug 8, 2016 at 12:30 PM, max square <max2subscr...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Ted for the prompt reply. >>>>> >>>>> There are three or four DFs that are coming from various sources and >>>>> I'm doing a unionAll on them. >>>>> >>>>> val placesProcessed = placesUnchanged.unionAll(place >>>>> sAddedWithMerchantId).unionAll(placesUpdatedFromHotelsWithMerchantId >>>>> ).unionAll(placesUpdatedFromRestaurantsWithMerchantId).unionAll( >>>>> placesChanged) >>>>> >>>>> I'm using Spark 1.6.2. >>>>> >>>>> On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Can you show the code snippet for unionAll operation ? >>>>>> >>>>>> Which Spark release do you use ? >>>>>> >>>>>> BTW please use user@spark.apache.org in the future. >>>>>> >>>>>> On Mon, Aug 8, 2016 at 11:47 AM, max square <max2subscr...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey guys, >>>>>>> >>>>>>> I'm trying to save Dataframe in CSV format after performing unionAll >>>>>>> operations on it. >>>>>>> But I get this exception - >>>>>>> >>>>>>> Exception in thread "main" org.apache.spark.sql.catalyst. >>>>>>> errors.package$TreeNodeException: execute, tree: >>>>>>> TungstenExchange hashpartitioning(mId#430,200) >>>>>>> >>>>>>> I'm saving it by >>>>>>> >>>>>>> df.write.format("com.databricks.spark.csv").options(Map("mode" -> >>>>>>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir >>>>>>> + latest) >>>>>>> >>>>>>> It works perfectly if I don't do the unionAll operation. >>>>>>> I see that the format isn't different by printing the part of the >>>>>>> results. >>>>>>> >>>>>>> Any help regarding this would be appreciated. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>