reuvenlax commented on a change in pull request #4145: Many simplifications to
WriteFiles
URL: https://github.com/apache/beam/pull/4145#discussion_r153034261
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -339,50 +297,189 @@ public boolean isWindowedWrites() {
sink, computeNumShards, numShardsProvider, true,
maxNumWritersPerBundle, sideInputs);
}
- private static class WriterKey<DestinationT> {
- private final BoundedWindow window;
- private final PaneInfo paneInfo;
- private final DestinationT destination;
+ @Override
+ public void validate(PipelineOptions options) {
+ sink.validate(options);
+ }
- WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT
destination) {
- this.window = window;
- this.paneInfo = paneInfo;
- this.destination = destination;
+ @Override
+ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
+ if (input.isBounded() == IsBounded.UNBOUNDED) {
+ checkArgument(
+ windowedWrites,
+ "Must use windowed writes when applying %s to an unbounded
PCollection",
+ WriteFiles.class.getSimpleName());
+ }
+ if (windowedWrites) {
+ // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+ // and similar behavior in other runners.
+ checkArgument(
+ computeNumShards != null || numShardsProvider != null,
+ "When using windowed writes, must specify number of output shards
explicitly",
+ WriteFiles.class.getSimpleName());
}
+ this.writeOperation = sink.createWriteOperation();
+ this.writeOperation.setWindowedWrites(windowedWrites);
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof WriterKey)) {
- return false;
- }
- WriterKey other = (WriterKey) o;
- return Objects.equal(window, other.window)
- && Objects.equal(paneInfo, other.paneInfo)
- && Objects.equal(destination, other.destination);
+ if (!windowedWrites) {
+ // Re-window the data into the global window and remove any existing
triggers.
+ input =
+ input.apply(
+ "RewindowIntoGlobal",
+ Window.<UserT>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ }
+
+ Coder<DestinationT> destinationCoder;
+ try {
+ destinationCoder =
+ getDynamicDestinations()
+
.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+ destinationCoder.verifyDeterministic();
+ } catch (CannotProvideCoderException | NonDeterministicException e) {
+ throw new RuntimeException(e);
+ }
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
+ FileResultCoder<DestinationT> fileResultCoder =
+ FileResultCoder.of(windowCoder, destinationCoder);
+
+ PCollectionView<Integer> numShardsView =
+ (computeNumShards == null) ? null : input.apply(computeNumShards);
+
+ PCollection<FileResult<DestinationT>> tempFileResults =
+ (computeNumShards == null && numShardsProvider == null)
+ ? input.apply(
+ "WriteUnshardedBundlesToTempFiles",
Review comment:
Unfortunately, refactoring into new PTransforms changes the name of every
single sub step (since step names are hierarchical).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services