This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5b600da86a21e2c4339261d73fa1c2588cb3ab8d Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Fri Nov 17 12:38:15 2017 -0800 Converts WriteFiles to AutoValue --- .../core/construction/WriteFilesTranslation.java | 5 +- .../construction/WriteFilesTranslationTest.java | 12 +- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 4 +- .../java/org/apache/beam/sdk/io/WriteFiles.java | 187 +++++++++------------ .../org/apache/beam/sdk/io/WriteFilesTest.java | 22 +-- 5 files changed, 104 insertions(+), 126 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index d0b2182..a6dd55c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -85,12 +85,13 @@ public class WriteFilesTranslation { @Override public boolean isWindowedWrites() { - return transform.isWindowedWrites(); + return transform.getWindowedWrites(); } @Override public boolean isRunnerDeterminedSharding() { - return transform.getNumShards() == null && transform.getSharding() == null; + return transform.getNumShardsProvider() == null + && transform.getComputeNumShards() == null; } }, components); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index ccb366e..2d45681 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -80,9 +80,11 @@ public class WriteFilesTranslationTest { assertThat( payload.getRunnerDeterminedSharding(), - equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null)); + equalTo( + writeFiles.getNumShardsProvider() == null + && writeFiles.getComputeNumShards() == null)); - assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites())); + assertThat(payload.getWindowedWrites(), equalTo(writeFiles.getWindowedWrites())); assertThat( (FileBasedSink<String, Void, String>) @@ -102,11 +104,13 @@ public class WriteFilesTranslationTest { assertThat( WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform), - equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null)); + equalTo( + writeFiles.getNumShardsProvider() == null + && writeFiles.getComputeNumShards() == null)); assertThat( WriteFilesTranslation.isWindowedWrites(appliedPTransform), - equalTo(writeFiles.isWindowedWrites())); + equalTo(writeFiles.getWindowedWrites())); assertThat( WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform), equalTo(writeFiles.getSink())); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 5bc84be..20d2a27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -451,8 +451,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> * written, * * <ol> - * <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the - * output bundles. + * <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files + * containing the output bundles. * <li>During finalize, these temporary files are copied to final output locations and named * according to a file naming template. * <li>Finally, any temporary files that were created during the write are removed. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 0a538b1..d6c5788 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.base.Objects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -108,7 +109,8 @@ import org.slf4j.LoggerFactory; * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre> */ @Experimental(Experimental.Kind.SOURCE_SINK) -public class WriteFiles<UserT, DestinationT, OutputT> +@AutoValue +public abstract class WriteFiles<UserT, DestinationT, OutputT> extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); @@ -125,19 +127,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; static final int UNKNOWN_SHARDNUM = -1; - private FileBasedSink<UserT, DestinationT, OutputT> sink; private @Nullable WriteOperation<DestinationT, OutputT> writeOperation; - // This allows the number of shards to be dynamically computed based on the input - // PCollection. - private final @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards; - // We don't use a side input for static sharding, as we want this value to be updatable - // when a pipeline is updated. - private final @Nullable ValueProvider<Integer> numShardsProvider; - private final boolean windowedWrites; - private int maxNumWritersPerBundle; - // This is the set of side inputs used by this transform. This is usually populated by the users's - // DynamicDestinations object. - private final List<PCollectionView<?>> sideInputs; /** * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting @@ -146,57 +136,59 @@ public class WriteFiles<UserT, DestinationT, OutputT> public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to( FileBasedSink<UserT, DestinationT, OutputT> sink) { checkArgument(sink != null, "sink can not be null"); - return new WriteFiles<>( - sink, - null /* runner-determined sharding */, - null, - false, - DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE, - sink.getDynamicDestinations().getSideInputs()); + return new AutoValue_WriteFiles.Builder<UserT, DestinationT, OutputT>() + .setSink(sink) + .setComputeNumShards(null) + .setNumShardsProvider(null) + .setWindowedWrites(false) + .setMaxNumWritersPerBundle(DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE) + .setSideInputs(sink.getDynamicDestinations().getSideInputs()) + .build(); } - private WriteFiles( - FileBasedSink<UserT, DestinationT, OutputT> sink, - @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards, - @Nullable ValueProvider<Integer> numShardsProvider, - boolean windowedWrites, - int maxNumWritersPerBundle, - List<PCollectionView<?>> sideInputs) { - this.sink = sink; - this.computeNumShards = computeNumShards; - this.numShardsProvider = numShardsProvider; - this.windowedWrites = windowedWrites; - this.maxNumWritersPerBundle = maxNumWritersPerBundle; - this.sideInputs = sideInputs; - } + public abstract FileBasedSink<UserT, DestinationT, OutputT> getSink(); - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); - } + @Nullable + public abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getComputeNumShards(); - /** Returns the {@link FileBasedSink} associated with this PTransform. */ - public FileBasedSink<UserT, DestinationT, OutputT> getSink() { - return sink; - } + // We don't use a side input for static sharding, as we want this value to be updatable + // when a pipeline is updated. + @Nullable + public abstract ValueProvider<Integer> getNumShardsProvider(); - /** Returns whether or not to perform windowed writes. */ - public boolean isWindowedWrites() { - return windowedWrites; - } + public abstract boolean getWindowedWrites(); - /** - * Gets the {@link PTransform} that will be used to determine sharding. This can be either a - * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by - * {@link #withSharding(PTransform)}), or runner-determined (by {@link - * #withRunnerDeterminedSharding()}. - */ - public @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() { - return computeNumShards; + abstract int getMaxNumWritersPerBundle(); + + abstract List<PCollectionView<?>> getSideInputs(); + + abstract Builder<UserT, DestinationT, OutputT> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<UserT, DestinationT, OutputT> { + abstract Builder<UserT, DestinationT, OutputT> setSink( + FileBasedSink<UserT, DestinationT, OutputT> sink); + + abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards( + PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards); + + abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider( + ValueProvider<Integer> numShardsProvider); + + abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites); + + abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle( + int maxNumWritersPerBundle); + + abstract Builder<UserT, DestinationT, OutputT> setSideInputs( + List<PCollectionView<?>> sideInputs); + + abstract WriteFiles<UserT, DestinationT, OutputT> build(); } - public @Nullable ValueProvider<Integer> getNumShards() { - return numShardsProvider; + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return PCollectionViews.toAdditionalInputs(getSideInputs()); } /** @@ -225,36 +217,18 @@ public class WriteFiles<UserT, DestinationT, OutputT> */ public WriteFiles<UserT, DestinationT, OutputT> withNumShards( ValueProvider<Integer> numShardsProvider) { - return new WriteFiles<>( - sink, - computeNumShards, - numShardsProvider, - windowedWrites, - maxNumWritersPerBundle, - sideInputs); + return toBuilder().setNumShardsProvider(numShardsProvider).build(); } /** Set the maximum number of writers created in a bundle before spilling to shuffle. */ public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle( int maxNumWritersPerBundle) { - return new WriteFiles<>( - sink, - computeNumShards, - numShardsProvider, - windowedWrites, - maxNumWritersPerBundle, - sideInputs); + return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build(); } public WriteFiles<UserT, DestinationT, OutputT> withSideInputs( List<PCollectionView<?>> sideInputs) { - return new WriteFiles<>( - sink, - computeNumShards, - numShardsProvider, - windowedWrites, - maxNumWritersPerBundle, - sideInputs); + return toBuilder().setSideInputs(sideInputs).build(); } /** @@ -268,8 +242,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) { checkArgument( sharding != null, "sharding can not be null. Use withRunnerDeterminedSharding() instead."); - return new WriteFiles<>( - sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs); + return toBuilder().setComputeNumShards(sharding).build(); } /** @@ -277,7 +250,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> * runner-determined sharding. */ public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() { - return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs); + return toBuilder().setComputeNumShards(null).setNumShardsProvider(null).build(); } /** @@ -293,35 +266,34 @@ public class WriteFiles<UserT, DestinationT, OutputT> * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value. */ public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() { - return new WriteFiles<>( - sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs); + return toBuilder().setWindowedWrites(true).build(); } @Override public void validate(PipelineOptions options) { - sink.validate(options); + getSink().validate(options); } @Override public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) { if (input.isBounded() == IsBounded.UNBOUNDED) { checkArgument( - windowedWrites, + getWindowedWrites(), "Must use windowed writes when applying %s to an unbounded PCollection", WriteFiles.class.getSimpleName()); } - if (windowedWrites) { + if (getWindowedWrites()) { // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 // and similar behavior in other runners. checkArgument( - computeNumShards != null || numShardsProvider != null, + getComputeNumShards() != null || getNumShardsProvider() != null, "When using windowed writes, must specify number of output shards explicitly", WriteFiles.class.getSimpleName()); } - this.writeOperation = sink.createWriteOperation(); - this.writeOperation.setWindowedWrites(windowedWrites); + this.writeOperation = getSink().createWriteOperation(); + this.writeOperation.setWindowedWrites(getWindowedWrites()); - if (!windowedWrites) { + if (!getWindowedWrites()) { // Re-window the data into the global window and remove any existing triggers. input = input.apply( @@ -347,10 +319,10 @@ public class WriteFiles<UserT, DestinationT, OutputT> FileResultCoder.of(windowCoder, destinationCoder); PCollectionView<Integer> numShardsView = - (computeNumShards == null) ? null : input.apply(computeNumShards); + (getComputeNumShards() == null) ? null : input.apply(getComputeNumShards()); PCollection<FileResult<DestinationT>> tempFileResults = - (computeNumShards == null && numShardsProvider == null) + (getComputeNumShards() == null && getNumShardsProvider() == null) ? input.apply( "WriteUnshardedBundlesToTempFiles", new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder)) @@ -370,13 +342,14 @@ public class WriteFiles<UserT, DestinationT, OutputT> public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink")) - .include("sink", sink); - if (getSharding() != null) { - builder.include("sharding", getSharding()); + .add(DisplayData.item("sink", getSink().getClass()).withLabel("WriteFiles Sink")) + .include("sink", getSink()); + if (getComputeNumShards() != null) { + builder.include("sharding", getComputeNumShards()); } else { builder.addIfNotNull( - DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards")); + DisplayData.item("numShards", getNumShardsProvider()) + .withLabel("Fixed Number of Shards")); } } @@ -395,7 +368,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> @Override public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) { - if (windowedWrites) { + if (getWindowedWrites()) { // Reshuffle the results to make them stable against retries. // Use a single void key to maximize size of bundles for finalization. return input @@ -437,7 +410,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> ParDo.of( new WriteUnshardedTempFilesWithSpillingFn( spilledRecordsTag, destinationCoder)) - .withSideInputs(sideInputs) + .withSideInputs(getSideInputs()) .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag))); PCollection<FileResult<DestinationT>> writtenBundleFiles = writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); @@ -456,7 +429,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create()) .apply( "WriteSpilled", - ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs)) + ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) .setCoder(fileResultCoder) .apply( "DropShardNum", @@ -514,7 +487,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); Writer<DestinationT, OutputT> writer = writers.get(key); if (writer == null) { - if (writers.size() <= maxNumWritersPerBundle) { + if (writers.size() <= getMaxNumWritersPerBundle()) { String uuid = UUID.randomUUID().toString(); LOG.info( "Opening writer {} for window {} pane {} destination {}", @@ -656,7 +629,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()) .apply( "WriteShardsIntoTempFiles", - ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs)) + ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs())) .setCoder(fileResultCoder); } } @@ -680,8 +653,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> if (numShardsView != null) { shardCount = context.sideInput(numShardsView); } else { - checkNotNull(numShardsProvider); - shardCount = numShardsProvider.get(); + checkNotNull(getNumShardsProvider()); + shardCount = getNumShardsProvider().get(); } checkArgument( shardCount > 0, @@ -771,7 +744,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> public WriteFilesResult<DestinationT> expand( PCollection<Iterable<FileResult<DestinationT>>> input) { - List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs); + List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs()); if (numShardsView != null) { finalizeSideInputs.add(numShardsView); } @@ -794,10 +767,10 @@ public class WriteFiles<UserT, DestinationT, OutputT> @Nullable Integer fixedNumShards; if (numShardsView != null) { fixedNumShards = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - fixedNumShards = numShardsProvider.get(); + } else if (getNumShardsProvider() != null) { + fixedNumShards = getNumShardsProvider().get(); } else { - checkState(!windowedWrites, "Windowed write should have set fixed sharding"); + checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding"); fixedNumShards = null; } List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 40ae0ea..b68cbf9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -329,21 +329,21 @@ public class WriteFilesTest { WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3); assertThat((SimpleSink<Void>) write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = - write.getSharding(); + write.getComputeNumShards(); - assertThat(write.getSharding(), is(nullValue())); - assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class)); - assertThat(write.getNumShards().get(), equalTo(3)); - assertThat(write.getSharding(), equalTo(originalSharding)); + assertThat(write.getComputeNumShards(), is(nullValue())); + assertThat(write.getNumShardsProvider(), instanceOf(StaticValueProvider.class)); + assertThat(write.getNumShardsProvider().get(), equalTo(3)); + assertThat(write.getComputeNumShards(), equalTo(originalSharding)); WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM); assertThat((SimpleSink<Void>) write2.getSink(), is(sink)); - assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM)); + assertThat(write2.getComputeNumShards(), equalTo(SHARDING_TRANSFORM)); // original unchanged WriteFiles<String, ?, ?> writeUnsharded = write2.withRunnerDeterminedSharding(); - assertThat(writeUnsharded.getSharding(), nullValue()); - assertThat(write.getSharding(), equalTo(originalSharding)); + assertThat(writeUnsharded.getComputeNumShards(), nullValue()); + assertThat(write.getComputeNumShards(), equalTo(originalSharding)); } @Test @@ -669,10 +669,10 @@ public class WriteFilesTest { p.run(); Optional<Integer> numShards = - (write.getNumShards() != null && !write.isWindowedWrites()) - ? Optional.of(write.getNumShards().get()) + (write.getNumShardsProvider() != null && !write.getWindowedWrites()) + ? Optional.of(write.getNumShardsProvider().get()) : Optional.<Integer>absent(); - checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites()); + checkFileContents(baseName, inputs, numShards, !write.getWindowedWrites()); } static void checkFileContents( -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.