Hi Roland,

I'm a big jerk, but apparently I forgot to assign this issue to myself when
I created it to track my own work. Sorry for the inconvenience, but I am
already testing a fairly complex solution here. Would you be willing to
review it when it's ready?

Comments on the design inline.

On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <taki...@apache.org> wrote:

> +Daniel Halperin <dhalp...@google.com>
>
>
> On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <role...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I would like to fix this issue:
>> https://issues.apache.org/jira/browse/BEAM-206
>>
>> Could you please revise my design proposal?
>>
>> I would copy and optionally remove the temporary files one by one as an
>> atomic operation rather then copying all of the temporary files and then
>> removing them (if we need to remove). It has the following benefits:
>>
>
I think the notion of making this a file-at-a-time operation is wrong -- we
still want
to preserve the ability to make a batch request for a network file system
like GCS
or S3.

* If the move operation supported by the file system and the file retention
>> is remove, we can use the native file move operation (or rename). Could be
>> significantly faster than the copy and remove.
>>
>
You're right that we should change the interface from copy & remove to
rename (which can be internally implemented as copy & remove if a file
storage
requires it). This will admit faster implementations for systems that have
an atomic rename operation (like file systems ;).


> * By moving the remove operation close to the copy operation, the
>> probability is lower to copy the file again because of any failure (if one
>> file of two is moved but the other one failed, when we replay, it moves
>> only the one that failed rather than starting from scratch)
>>
>
I'm not sure this part follows inside the Beam model. There is no easy way
to force
each file to be in its own bundle, so we can't really do retries (at the
model level)
independently for each file.

You can of course follow this model inside a bulk-rename step, but you'll
have to carefully consider the semantics if some rename fails and the
entire operation is retried. I'm confident that this could be made into a
good design!

I think it's not trivial to detect correctness here. If the move "source"
does not exist,
can you tell a successful move from an error? (Note that it's common for
users
to rerun a job without deleting the old output, so the "destination" may
already exist. *sigh ;)*.)

Regarding the concurrency, I would use an ExecutorService to run the
>> aforementioned operation simultaneously.
>
>
Seems right to me!


> The first exception would stop
>> (interrupt) all operation.
>>
>
Per the above comments -- we need to design this step to idempotent (or as
close as we can).
Stopping at the first exception may be a good thing to do, as long as
retrying or resuming
will result in the correct output.


>
>> The level of the concurrency (number of threads) would be file system
>> specific and configurable. I can imagine 10+ threads gives a good
>> performance on GCS but gives bad performance on local file system.
>>
>
This is true -- you will want to tune the implementation for each file
storage.

I have done many experiments in the past week about GCS in particular -- the
conclusion here was to use batches of size 100 and about 32 concurrent
threads
for best performance and also robustness to failures.


>
>> Best regards,
>> Roland Harangozo
>>
>
Thanks so much for this email and design -- it's great. Let's keep
discussing, and, would you be willing to review a pull request from me
for the GCS part of this change?

Would you like to try implementing a FileOperationsFactory
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>
for
another endpoint such as AWS S3?

Thanks,
Dan

Reply via email to