Based on my experience
if you use ³simple² streaming. (I.E. You do not use windows) after every mini batch you will ³save² This will cause a dir in hdfs with the timestamp as part of the path. With in the dir, a separate part file will be created for each partition. If you used windowing you could probably write several mini batches at one time. (I have not used windows so I am speculating what the behavior will be) If you are running batch processing you could easily merge many small files if needed. From: Mich Talebzadeh <mich.talebza...@gmail.com> Date: Tuesday, April 5, 2016 at 4:17 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS > I agree every time an OS file is created, it requires a context switch plus a > file descriptor. It is probably more time consuming to open and close these > files than actually doing the work. > > I always wondered about performance implication of Spark streaming and > although there are some early days results. I have yet to see any concrete P&T > on this. > > My issue is that I want to use Spark streaming with Complex Event Processing > by developing adaptors (a combination of filters and mappers) to distinguish > signal from pedestal in real terms and only Save data to persistent storage > (HDFS) if they are of value. > > I am using Kafka upstream and that does a good job. Now I am trying to > experiment with saving data to HDFS in one form or shape. Basically this is > just immutable data so the lesser partition the better. I am happy to store > the data in text, parquet or (ORC format in Hive) as long as it works. > > Regards > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8 > Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV > 8Pw> > > > > http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> > > > > On 5 April 2016 at 23:59, Andy Davidson <a...@santacruzintegration.com> wrote: >> In my experience my streaming I was getting tens of thousands of empty files >> created in HDFS. This was crushing my systems performance when my batch jobs >> ran over the data sets. There is a lot of over head open and closing empty >> files. >> >> I think creating empty files or keeping empty partitions around is probably a >> bug how ever I never filed a bug report. Please file a bug report. Please >> copy me on the Jira >> >> There is also a related performance issue. I use reparation() to ensure CSV >> files have a max number of rows. (it an product requirement to make csv files >> more user friendly). In my experience if I do not reparation a partitions >> with a single row of data would cause a separate part-* file to be created. I >> wound out with large number of very small files. I have always wonder how to >> configure partitions to get better performance. I would think we are better >> off with a few very large partitions in most cases. I.E. Keep more stuff in >> memory with less overhead. I was really hoping Spark would automatically >> handle this for me >> >> Andy >> >> From: Mich Talebzadeh <mich.talebza...@gmail.com> >> Date: Tuesday, April 5, 2016 at 3:49 PM >> To: Andrew Davidson <a...@santacruzintegration.com> >> Cc: "user @spark" <user@spark.apache.org> >> Subject: Re: Saving Spark streaming RDD with saveAsTextFiles ends up >> creating empty files on HDFS >> >>> Thanks Andy. >>> >>> Do we know if this is a known bug or simply a feature that on the face of it >>> Spark cannot save RDD output to a text file? >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUr >>> V8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABU >>> rV8Pw> >>> >>> >>> >>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> >>> >>> >>> >>> On 5 April 2016 at 23:35, Andy Davidson <a...@santacruzintegration.com> >>> wrote: >>>> Hi Mich >>>> >>>> Yup I was surprised to find empty files. Its easy to work around. Note I >>>> should probably use coalesce() and not repartition() >>>> >>>> In general I found I almost always need to reparation. I was getting >>>> thousands of empty partitions. It was really slowing my system down. >>>> >>>> private static void save(JavaDStream<String> json, String outputURIBase) >>>> { >>>> >>>> /* >>>> >>>> using saveAsTestFiles will cause lots of empty directories to be >>>> created. >>>> >>>> DStream<String> data = json.dstream(); >>>> >>>> data.saveAsTextFiles(outputURI, null); >>>> >>>> */ >>>> >>>> >>>> >>>> jsonTweets.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { >>>> >>>> private static final long serialVersionUID = 1L; >>>> >>>> @Override >>>> >>>> public void call(JavaRDD<String> rdd, Time time) throws >>>> Exception { >>>> >>>> Long count = rdd.count(); >>>> >>>> //if(!rdd.isEmpty()) { >>>> >>>> if(count > 0) { >>>> >>>> rdd = repartition(rdd, count.intValue()); >>>> >>>> long milliSeconds = time.milliseconds(); >>>> >>>> String date = >>>> Utils.convertMillisecondsToDateStr(milliSeconds); >>>> >>>> String dirPath = outputURIBase >>>> >>>> + File.separator + date >>>> >>>> + File.separator + "tweet-" + >>>> time.milliseconds(); >>>> >>>> rdd.saveAsTextFile(dirPath); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> final int maxNumRowsPerFile = 200; >>>> >>>> JavaRDD<String> repartition(JavaRDD<String> rdd, int count) { >>>> >>>> long numPartisions = count / maxNumRowsPerFile + 1; >>>> >>>> Long tmp = numPartisions; >>>> >>>> rdd = rdd.repartition(tmp.intValue()); >>>> >>>> return rdd; >>>> >>>> } >>>> >>>> }); >>>> >>>> >>>> >>>> } >>>> >>>> >>>> >>>> >>>> From: Mich Talebzadeh <mich.talebza...@gmail.com> >>>> Date: Tuesday, April 5, 2016 at 3:06 PM >>>> To: "user @spark" <user@spark.apache.org> >>>> Subject: Saving Spark streaming RDD with saveAsTextFiles ends up creating >>>> empty files on HDFS >>>> >>>>> Spark 1.6.1 >>>>> >>>>> The following creates empty files. It prints lines OK with println >>>>> >>>>> val result = lines.filter(_.contains("ASE 15")).flatMap(line => >>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) >>>>> result.saveAsTextFiles("/tmp/rdd_stuff") >>>>> >>>>> I am getting zero length files >>>>> >>>>> drwxr-xr-x - hduser supergroup 0 2016-04-05 23:19 >>>>> /tmp/rdd_stuff-1459894755000 >>>>> drwxr-xr-x - hduser supergroup 0 2016-04-05 23:20 >>>>> /tmp/rdd_stuff-1459894810000 >>>>> >>>>> Any ideas? >>>>> >>>>> Thanks, >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOAB >>>>> UrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOA >>>>> BUrV8Pw> >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> >>>>> >>>>> >>> >