Hi Roland, I'm a big jerk, but apparently I forgot to assign this issue to myself when I created it to track my own work. Sorry for the inconvenience, but I am already testing a fairly complex solution here. Would you be willing to review it when it's ready?
Comments on the design inline. On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <taki...@apache.org> wrote: > +Daniel Halperin <dhalp...@google.com> > > > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <role...@gmail.com> > wrote: > >> Hi All, >> >> I would like to fix this issue: >> https://issues.apache.org/jira/browse/BEAM-206 >> >> Could you please revise my design proposal? >> >> I would copy and optionally remove the temporary files one by one as an >> atomic operation rather then copying all of the temporary files and then >> removing them (if we need to remove). It has the following benefits: >> > I think the notion of making this a file-at-a-time operation is wrong -- we still want to preserve the ability to make a batch request for a network file system like GCS or S3. * If the move operation supported by the file system and the file retention >> is remove, we can use the native file move operation (or rename). Could be >> significantly faster than the copy and remove. >> > You're right that we should change the interface from copy & remove to rename (which can be internally implemented as copy & remove if a file storage requires it). This will admit faster implementations for systems that have an atomic rename operation (like file systems ;). > * By moving the remove operation close to the copy operation, the >> probability is lower to copy the file again because of any failure (if one >> file of two is moved but the other one failed, when we replay, it moves >> only the one that failed rather than starting from scratch) >> > I'm not sure this part follows inside the Beam model. There is no easy way to force each file to be in its own bundle, so we can't really do retries (at the model level) independently for each file. You can of course follow this model inside a bulk-rename step, but you'll have to carefully consider the semantics if some rename fails and the entire operation is retried. I'm confident that this could be made into a good design! I think it's not trivial to detect correctness here. If the move "source" does not exist, can you tell a successful move from an error? (Note that it's common for users to rerun a job without deleting the old output, so the "destination" may already exist. *sigh ;)*.) Regarding the concurrency, I would use an ExecutorService to run the >> aforementioned operation simultaneously. > > Seems right to me! > The first exception would stop >> (interrupt) all operation. >> > Per the above comments -- we need to design this step to idempotent (or as close as we can). Stopping at the first exception may be a good thing to do, as long as retrying or resuming will result in the correct output. > >> The level of the concurrency (number of threads) would be file system >> specific and configurable. I can imagine 10+ threads gives a good >> performance on GCS but gives bad performance on local file system. >> > This is true -- you will want to tune the implementation for each file storage. I have done many experiments in the past week about GCS in particular -- the conclusion here was to use batches of size 100 and about 32 concurrent threads for best performance and also robustness to failures. > >> Best regards, >> Roland Harangozo >> > Thanks so much for this email and design -- it's great. Let's keep discussing, and, would you be willing to review a pull request from me for the GCS part of this change? Would you like to try implementing a FileOperationsFactory <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554> for another endpoint such as AWS S3? Thanks, Dan