[ https://issues.apache.org/jira/browse/SPARK-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu updated SPARK-19523: --------------------------------- Component/s: (was: Structured Streaming) > Spark streaming+ insert into table leaves bunch of trash in table directory > --------------------------------------------------------------------------- > > Key: SPARK-19523 > URL: https://issues.apache.org/jira/browse/SPARK-19523 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.0.2 > Reporter: Egor Pahomov > Priority: Minor > > I have very simple code, which transform coming json files into pq table: > {code} > import org.apache.spark.sql.hive.HiveContext > import org.apache.hadoop.fs.Path > import org.apache.hadoop.io.{LongWritable, Text} > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat > import org.apache.spark.sql.SaveMode > object Client_log { > def main(args: Array[String]): Unit = { > val resultCols = new HiveContext(Spark.ssc.sparkContext).sql(s"select * > from temp.x_streaming where year=2015 and month=12 and day=1").dtypes > var columns = resultCols.filter(x => > !Commons.stopColumns.contains(x._1)).map({ case (name, types) => { > s"""cast (get_json_object(s, '""" + '$' + s""".properties.${name}') as > ${Commons.mapType(types)}) as $name""" > } > }) > columns ++= List("'streaming' as sourcefrom") > def f(path:Path): Boolean = { > true > } > val client_log_d_stream = Spark.ssc.fileStream[LongWritable, Text, > TextInputFormat]("/user/egor/test2", f _ , newFilesOnly = false) > client_log_d_stream.foreachRDD(rdd => { > val localHiveContext = new HiveContext(rdd.sparkContext) > import localHiveContext.implicits._ > var input = rdd.map(x => Record(x._2.toString)).toDF() > input = input.selectExpr(columns: _*) > input = > SmallOperators.populate(input, resultCols) > input > .write > .mode(SaveMode.Append) > .format("parquet") > .insertInto("temp.x_streaming") > }) > Spark.ssc.start() > Spark.ssc.awaitTermination() > } > case class Record(s: String) > } > {code} > This code generates a lot of trash directories in resalt table like: > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-00_298_7130707897870357017-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-00_309_6225285476054854579-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-06_305_2185311414031328806-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-06_309_6331022557673464922-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-12_334_1333065569942957405-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-12_387_3622176537686712754-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-18_339_1008134657443203932-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-18_421_3284019142681396277-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-24_291_5985064758831763168-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-24_300_6751765745457248879-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-30_314_2987765230093671316-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-30_331_2746678721907502111-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-36_311_1466065813702202959-1 > drwxrwxrwt 3 egor nobody 4096 Feb 8 14:15 > .hive-staging_hive_2017-02-08_14-15-36_317_7079974647544197072-1 -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org