This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 97df5e703d4a891ab63a40b46c4e87d7c373168b Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Wed Nov 15 19:51:10 2017 -0800 consolidates windowed/unwindowed finalize fns somewhat --- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 138 +++++++++---- .../java/org/apache/beam/sdk/io/WriteFiles.java | 221 ++++++--------------- .../org/apache/beam/sdk/io/FileBasedSinkTest.java | 19 +- 3 files changed, 171 insertions(+), 207 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index c8bdbfc..5bc84be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -28,10 +28,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -39,6 +41,7 @@ import java.io.Serializable; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -46,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -72,6 +76,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; @@ -103,10 +108,9 @@ import org.slf4j.LoggerFactory; * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the * event of failure/retry or for redundancy). However, exactly one of these executions will have its - * result passed to the finalize method. Each call to {@link Writer#open} or {@link - * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles - * transform, so even redundant or retried bundles will have a unique way of identifying their - * output. + * result passed to the finalize method. Each call to {@link Writer#open} is passed a unique + * <i>bundle id</i> when it is called by the WriteFiles transform, so even redundant or retried + * bundles will have a unique way of identifying their output. * * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness * guarantee is important; if a bundle is to be output to a file, for example, the name of the file @@ -447,7 +451,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * written, * * <ol> - * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the + * <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the * output bundles. * <li>During finalize, these temporary files are copied to final output locations and named * according to a file naming template. @@ -577,17 +581,22 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions * are thrown, however there are still some scenarios where temporary files might be left. */ - public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException { + public void removeTemporaryFiles(Collection<ResourceId> filenames) throws IOException { removeTemporaryFiles(filenames, !windowedWrites); } @Experimental(Kind.FILESYSTEM) - protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames( + protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestination( @Nullable DestinationT dest, @Nullable BoundedWindow window, @Nullable Integer numShards, - Iterable<FileResult<DestinationT>> writerResults) { - for (FileResult<DestinationT> res : writerResults) { + Collection<FileResult<DestinationT>> existingResults) throws Exception { + Collection<FileResult<DestinationT>> completeResults = + windowedWrites + ? existingResults + : createMissingEmptyShards(dest, numShards, existingResults); + + for (FileResult<DestinationT> res : completeResults) { checkArgument( Objects.equals(dest, res.getDestination()), "File result has wrong destination: expected %s, got %s", @@ -602,7 +611,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> final int effectiveNumShards; if (numShards != null) { effectiveNumShards = numShards; - for (FileResult<DestinationT> res : writerResults) { + for (FileResult<DestinationT> res : completeResults) { checkArgument( res.getShard() != UNKNOWN_SHARDNUM, "Fixed sharding into %s shards was specified, " @@ -611,8 +620,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> res); } } else { - effectiveNumShards = Iterables.size(writerResults); - for (FileResult<DestinationT> res : writerResults) { + effectiveNumShards = Iterables.size(completeResults); + for (FileResult<DestinationT> res : completeResults) { checkArgument( res.getShard() == UNKNOWN_SHARDNUM, "Runner-chosen sharding was specified, " @@ -623,7 +632,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList(); if (numShards != null) { - resultsWithShardNumbers = Lists.newArrayList(writerResults); + resultsWithShardNumbers = Lists.newArrayList(completeResults); } else { checkState( !windowedWrites, @@ -644,7 +653,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> return firstFilename.compareTo(secondFilename); } }) - .sortedCopy(writerResults); + .sortedCopy(completeResults); for (int i = 0; i < sortedByTempFilename.size(); i++) { resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i)); } @@ -672,10 +681,71 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> return outputFilenames; } + private Collection<FileResult<DestinationT>> createMissingEmptyShards( + @Nullable DestinationT dest, + @Nullable Integer numShards, + Collection<FileResult<DestinationT>> existingResults) + throws Exception { + Collection<FileResult<DestinationT>> completeResults; + LOG.info("Finalizing for destination {} num shards {}.", dest, existingResults.size()); + if (numShards != null) { + checkArgument( + existingResults.size() <= numShards, + "Fixed sharding into %s shards was specified, but got %s file results", + numShards, + existingResults.size()); + } + // We must always output at least 1 shard, and honor user-specified numShards + // if set. + Set<Integer> missingShardNums; + if (numShards == null) { + missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM); + } else { + missingShardNums = Sets.newHashSet(); + for (int i = 0; i < numShards; ++i) { + missingShardNums.add(i); + } + for (FileResult<DestinationT> res : existingResults) { + checkArgument( + res.getShard() != UNKNOWN_SHARDNUM, + "Fixed sharding into %s shards was specified, " + + "but file result %s does not specify a shard", + numShards, + res); + missingShardNums.remove(res.getShard()); + } + } + completeResults = Lists.newArrayList(existingResults); + if (!missingShardNums.isEmpty()) { + LOG.info( + "Creating {} empty output shards in addition to {} written for destination {}.", + missingShardNums.size(), + existingResults.size(), + dest); + for (int shard : missingShardNums) { + String uuid = UUID.randomUUID().toString(); + LOG.info("Opening empty writer {} for destination {}", uuid, dest); + Writer<DestinationT, ?> writer = createWriter(); + // Currently this code path is only called in the unwindowed case. + writer.open(uuid, dest); + writer.close(); + completeResults.add( + new FileResult<>( + writer.getOutputFile(), + shard, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + dest)); + } + LOG.debug("Done creating extra shards for {}.", dest); + } + return completeResults; + } + /** * Copy temporary files to final output filenames using the file naming template. * - * <p>Can be called from subclasses that override {@link WriteOperation#finalize}. + * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}. * * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files * will be the same as the sorted order of the input filenames. In other words (when using @@ -686,40 +756,38 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> */ @VisibleForTesting @Experimental(Kind.FILESYSTEM) - final void copyToOutputFiles( + final void moveToOutputFiles( List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException { int numFiles = resultsToFinalFilenames.size(); - if (numFiles > 0) { - LOG.debug("Copying {} files.", numFiles); - List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size()); - List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size()); - for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { - srcFiles.add(entry.getKey().getTempFilename()); - dstFiles.add(entry.getValue()); - LOG.info( - "Will copy temporary file {} to final location {}", - entry.getKey().getTempFilename(), - entry.getValue()); - } - // During a failure case, files may have been deleted in an earlier step. Thus - // we ignore missing files here. - FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES); - } else { - LOG.info("No output files to write."); + LOG.debug("Copying {} files.", numFiles); + List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size()); + List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size()); + for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { + srcFiles.add(entry.getKey().getTempFilename()); + dstFiles.add(entry.getValue()); + LOG.info( + "Will copy temporary file {} to final location {}", + entry.getKey().getTempFilename(), + entry.getValue()); } + // During a failure case, files may have been deleted in an earlier step. Thus + // we ignore missing files here. + FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES); + removeTemporaryFiles(srcFiles); } /** * Removes temporary output files. Uses the temporary directory to find files to remove. * - * <p>Can be called from subclasses that override {@link WriteOperation#finalize}. + * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}. * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize * temporary files, this method will remove them. */ @VisibleForTesting @Experimental(Kind.FILESYSTEM) final void removeTemporaryFiles( - Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException { + Collection<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) + throws IOException { ResourceId tempDir = tempDirectory.get(); LOG.debug("Removing temporary bundle output files in {}.", tempDir); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 28ac1a5..9cfabfe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -26,18 +26,14 @@ import com.google.common.base.Objects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -66,6 +62,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; @@ -630,7 +627,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> * FileBasedSink} for a description of writer results)-one for each bundle. * * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer - * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize + * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize * the write. * * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called @@ -661,11 +658,25 @@ public class WriteFiles<UserT, DestinationT, OutputT> // PCollection. There is a dependency between this ParDo and the first (the // WriteOperation PCollection as a side input), so this will happen after the // initial ParDo. - PCollectionView<Integer> numShardsView = + final PCollectionView<Integer> numShardsView = (computeNumShards == null) ? null : input.apply(computeNumShards); List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null ? ImmutableList.<PCollectionView<Integer>>of() : ImmutableList.of(numShardsView); + SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards = + new SerializableFunction<DoFn.ProcessContext, Integer>() { + @Override + public Integer apply(DoFn<?, ?>.ProcessContext c) { + if (numShardsView != null) { + return c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + return numShardsProvider.get(); + } else { + return null; + } + } + }; + @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); @@ -755,16 +766,11 @@ public class WriteFiles<UserT, DestinationT, OutputT> .apply(Values.<FileResult<DestinationT>>create()) .apply( "FinalizeWindowed", - ParDo.of( - new FinalizeWindowedFn<>( - numShardsView, numShardsProvider, writeOperation)) - .withSideInputs( - numShardsView == null - ? ImmutableList.<PCollectionView<?>>of() - : ImmutableList.of(numShardsView))) + ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation)) + .withSideInputs(shardingSideInputs)) .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); } else { - final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = + PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = results.apply(View.<FileResult<DestinationT>>asIterable()); // Finalize the write in another do-once ParDo on the singleton collection containing the @@ -775,14 +781,13 @@ public class WriteFiles<UserT, DestinationT, OutputT> // For the non-windowed case, we guarantee that if no data is written but the user has // set numShards, then all shards will be written out as empty files. For this reason we // use a side input here. - PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); outputFilenames = - singletonCollection + p.apply(Create.of((Void) null)) .apply( "FinalizeUnwindowed", ParDo.of( new FinalizeUnwindowedFn<>( - numShardsView, numShardsProvider, resultsView, writeOperation)) + getFixedNumShards, resultsView, writeOperation)) .withSideInputs( FluentIterable.concat(sideInputs, shardingSideInputs) .append(resultsView) @@ -798,19 +803,16 @@ public class WriteFiles<UserT, DestinationT, OutputT> private static class FinalizeWindowedFn<DestinationT> extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> { - @Nullable private final PCollectionView<Integer> numShardsView; - @Nullable private final ValueProvider<Integer> numShardsProvider; + private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards; private final WriteOperation<DestinationT, ?> writeOperation; @Nullable private transient List<FileResult<DestinationT>> fileResults; @Nullable private Integer fixedNumShards; public FinalizeWindowedFn( - @Nullable PCollectionView<Integer> numShardsView, - @Nullable ValueProvider<Integer> numShardsProvider, + SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards, WriteOperation<DestinationT, ?> writeOperation) { - this.numShardsView = numShardsView; - this.numShardsProvider = numShardsProvider; + this.getFixedNumShards = getFixedNumShards; this.writeOperation = writeOperation; } @@ -824,58 +826,37 @@ public class WriteFiles<UserT, DestinationT, OutputT> public void processElement(ProcessContext c) { fileResults.add(c.element()); if (fixedNumShards == null) { - if (numShardsView != null) { - fixedNumShards = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - fixedNumShards = numShardsProvider.get(); - } else { - throw new IllegalStateException( - "When finalizing a windowed write, should have set fixed sharding"); - } + fixedNumShards = getFixedNumShards.apply(c); + checkState(fixedNumShards != null, "Windowed write should have set fixed sharding"); } } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - Set<ResourceId> tempFiles = Sets.newHashSet(); - Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results = - groupByDestinationAndWindow(fileResults); - List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); - for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> - destEntry : results.asMap().entrySet()) { - DestinationT destination = destEntry.getKey().getKey(); - BoundedWindow window = destEntry.getKey().getValue(); - resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames( - destination, window, fixedNumShards, destEntry.getValue())); - } - LOG.info("Will finalize {} files", resultsToFinalFilenames.size()); + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = + finalizeAllDestinations(writeOperation, fileResults, fixedNumShards); for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { FileResult<DestinationT> res = entry.getKey(); - tempFiles.add(res.getTempFilename()); c.output( KV.of(res.getDestination(), entry.getValue().toString()), res.getWindow().maxTimestamp(), res.getWindow()); } - writeOperation.copyToOutputFiles(resultsToFinalFilenames); - writeOperation.removeTemporaryFiles(tempFiles); + writeOperation.moveToOutputFiles(resultsToFinalFilenames); } } private static class FinalizeUnwindowedFn<DestinationT> extends DoFn<Void, KV<DestinationT, String>> { - @Nullable private final PCollectionView<Integer> numShardsView; - @Nullable private final ValueProvider<Integer> numShardsProvider; + private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards; private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView; private final WriteOperation<DestinationT, ?> writeOperation; public FinalizeUnwindowedFn( - @Nullable PCollectionView<Integer> numShardsView, - @Nullable ValueProvider<Integer> numShardsProvider, + SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards, PCollectionView<Iterable<FileResult<DestinationT>>> resultsView, WriteOperation<DestinationT, ?> writeOperation) { - this.numShardsView = numShardsView; - this.numShardsProvider = numShardsProvider; + this.getFixedNumShards = getFixedNumShards; this.resultsView = resultsView; this.writeOperation = writeOperation; } @@ -883,118 +864,40 @@ public class WriteFiles<UserT, DestinationT, OutputT> @ProcessElement public void processElement(ProcessContext c) throws Exception { writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); - @Nullable Integer fixedNumShards; - if (numShardsView != null) { - fixedNumShards = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - fixedNumShards = numShardsProvider.get(); - } else { - fixedNumShards = null; - } - Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap = - ArrayListMultimap.create(); - for (FileResult<DestinationT> result : c.sideInput(resultsView)) { - resultsByDestMultimap.put(result.getDestination(), result); - } - Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest = - resultsByDestMultimap.asMap(); - if (resultsByDest.isEmpty()) { - Collection<FileResult<DestinationT>> empty = ImmutableList.of(); - resultsByDest = - Collections.singletonMap( - writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty); - } - List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); - for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> - destEntry : resultsByDest.entrySet()) { - resultsToFinalFilenames.addAll( - finalizeForDestinationFillEmptyShards( - destEntry.getKey(), fixedNumShards, destEntry.getValue())); - } - Set<ResourceId> tempFiles = Sets.newHashSet(); - for (KV<FileResult<DestinationT>, ResourceId> entry : - resultsToFinalFilenames) { - tempFiles.add(entry.getKey().getTempFilename()); + List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView)); + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = + fileResults.isEmpty() + ? writeOperation.finalizeDestination( + writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), + GlobalWindow.INSTANCE, + getFixedNumShards.apply(c), + ImmutableList.<FileResult<DestinationT>>of()) + : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c)); + for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString())); } - writeOperation.copyToOutputFiles(resultsToFinalFilenames); - writeOperation.removeTemporaryFiles(tempFiles); + writeOperation.moveToOutputFiles(resultsToFinalFilenames); } + } - /** - * Finalize a list of files for a single destination. If a minimum number of shards is needed, - * this function will generate empty files for this destination to ensure that all shards are - * generated. - */ - private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards( - DestinationT destination, - @Nullable Integer fixedNumShards, - Collection<FileResult<DestinationT>> existingResults) - throws Exception { - checkState(!writeOperation.windowedWrites); - - LOG.info( - "Finalizing write operation {} for destination {} num shards {}.", - writeOperation, - destination, - existingResults.size()); - if (fixedNumShards != null) { - checkArgument( - existingResults.size() <= fixedNumShards, - "Fixed sharding into %s shards was specified, but got %s file results", - fixedNumShards, - existingResults.size()); - } - // We must always output at least 1 shard, and honor user-specified numShards - // if set. - Set<Integer> missingShardNums; - if (fixedNumShards == null) { - missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM); - } else { - missingShardNums = Sets.newHashSet(); - for (int i = 0; i < fixedNumShards; ++i) { - missingShardNums.add(i); - } - for (FileResult<DestinationT> res : existingResults) { - checkArgument( - res.getShard() != UNKNOWN_SHARDNUM, - "Fixed sharding into %s shards was specified, " - + "but file result %s does not specify a shard", + private static <DestinationT> + List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations( + WriteOperation<DestinationT, ?> writeOperation, + List<FileResult<DestinationT>> fileResults, + Integer fixedNumShards) + throws Exception { + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); + Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap = + groupByDestinationAndWindow(fileResults); + for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> + destEntry : resultsByDestMultimap.asMap().entrySet()) { + resultsToFinalFilenames.addAll( + writeOperation.finalizeDestination( + destEntry.getKey().getKey(), + destEntry.getKey().getValue(), fixedNumShards, - res); - missingShardNums.remove(res.getShard()); - } - } - List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults); - if (!missingShardNums.isEmpty()) { - LOG.info( - "Creating {} empty output shards in addition to {} written for destination {}.", - missingShardNums.size(), - existingResults.size(), - destination); - for (int shard : missingShardNums) { - String uuid = UUID.randomUUID().toString(); - LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination); - Writer<DestinationT, ?> writer = writeOperation.createWriter(); - // Currently this code path is only called in the unwindowed case. - writer.open(uuid, destination); - writer.close(); - completeResults.add( - new FileResult<>( - writer.getOutputFile(), - shard, - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - destination)); - } - LOG.debug("Done creating extra shards for {}.", destination); - } - return - writeOperation.buildOutputFilenames( - destination, - GlobalWindow.INSTANCE, - (fixedNumShards == null) ? null : completeResults.size(), - completeResults); + destEntry.getValue())); } + return resultsToFinalFilenames; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index f7988bb..561d036 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -45,7 +44,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileResult; @@ -205,13 +203,8 @@ public class FileBasedSinkTest { // TODO: test with null first argument? List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = - writeOp.buildOutputFilenames(null, null, null, fileResults); - Set<ResourceId> tempFiles = Sets.newHashSet(); - for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) { - tempFiles.add(res.getKey().getTempFilename()); - } - writeOp.copyToOutputFiles(resultsToFinalFilenames); - writeOp.removeTemporaryFiles(tempFiles); + writeOp.finalizeDestination(null, null, null, fileResults); + writeOp.moveToOutputFiles(resultsToFinalFilenames); for (int i = 0; i < numFiles; i++) { ResourceId outputFilename = @@ -304,7 +297,7 @@ public class FileBasedSinkTest { } // Copy input files to output files. - writeOp.copyToOutputFiles(resultsToFinalFilenames); + writeOp.moveToOutputFiles(resultsToFinalFilenames); // Assert that the contents were copied. for (int i = 0; i < expectedOutputPaths.size(); i++) { @@ -355,7 +348,7 @@ public class FileBasedSinkTest { /** Reject non-distinct output filenames. */ @Test - public void testCollidingOutputFilenames() throws IOException { + public void testCollidingOutputFilenames() throws Exception { ResourceId root = getBaseOutputDirectory(); SimpleSink<Void> sink = SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED); @@ -366,12 +359,12 @@ public class FileBasedSinkTest { ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { - Iterable<FileResult<Void>> results = + List<FileResult<Void>> results = Lists.newArrayList( new FileResult<Void>(temp1, 1 /* shard */, null, null, null), new FileResult<Void>(temp2, 1 /* shard */, null, null, null), new FileResult<Void>(temp3, 1 /* shard */, null, null, null)); - writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results); + writeOp.finalizeDestination(null, null, 5 /* numShards */, results); fail("Should have failed."); } catch (IllegalArgumentException exn) { assertThat(exn.getMessage(), containsString("generated the same name")); -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.