This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch release-2.14.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.14.0 by this push: new a7449df [BEAM-7689] make a temporary directory unique for FileBaseSink new 3187089 Merge pull request #9039 from ihji/cherry-pick-7689 a7449df is described below commit a7449df21c28db320f9757a44953f3b09d2002e3 Author: Heejong Lee <heej...@gmail.com> AuthorDate: Mon Jul 8 14:18:42 2019 -0700 [BEAM-7689] make a temporary directory unique for FileBaseSink --- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index eff8a7c..2c21d5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -82,9 +81,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; -import org.joda.time.Instant; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -510,7 +506,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if * tempDirectory is /path/to/foo/, the temporary directory will be - * /path/to/foo/temp-beam-foo-$date. + * /path/to/foo/.temp-beam-$uuid. * * @param sink the FileBasedSink that will be used to configure this write operation. */ @@ -522,20 +518,12 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> private static class TemporaryDirectoryBuilder implements SerializableFunction<ResourceId, ResourceId> { - private static final AtomicLong TEMP_COUNT = new AtomicLong(0); - private static final DateTimeFormatter TEMPDIR_TIMESTAMP = - DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss"); - // The intent of the code is to have a consistent value of tempDirectory across - // all workers, which wouldn't happen if now() was called inline. - private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP); - // Multiple different sinks may be used in the same output directory; use tempId to create a - // separate temp directory for each. - private final Long tempId = TEMP_COUNT.getAndIncrement(); + private final UUID tempUUID = UUID.randomUUID(); @Override public ResourceId apply(ResourceId tempDirectory) { - // Temp directory has a timestamp and a unique ID - String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", timestamp, tempId); + // Temp directory has a random UUID postfix (BEAM-7689) + String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s", tempUUID); return tempDirectory .getCurrentDirectory() .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);