ibzib commented on a change in pull request #12218:
URL: https://github.com/apache/beam/pull/12218#discussion_r453048035
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
##########
@@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows
rewindowingStrategy) {
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T>
input) {
final int combinedKey = 42;
+ if
(input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())
+ && rewindowingStrategy instanceof IntoGlobalWindow) {
+ // If we don't have to worry about complicated triggering semantics we
can generate
+ // a much simpler pipeline. This is particularly useful for
bootstrapping runners so that
+ // we can run subsets of the validates runner test suite requiring
support of only the
+ // most basic primitives.
+
+ // In order to ensure we actually get an (empty) iterable rather than
an empty PCollection
+ // when the input is an empty PCollection, we flatten with a dummy
PCollection containing
+ // an empty iterable before grouping on a singleton key and
concatenating.
+ PCollection<Iterable<ValueInSingleWindow<T>>> actual =
+ input.apply(Reify.windows()).apply(ParDo.of(new
ToSingletonIterables<>()));
+ PCollection<Iterable<ValueInSingleWindow<T>>> dummy =
+ input
+ .getPipeline()
+ .apply(
+ Create.<Iterable<ValueInSingleWindow<T>>>of(
+ ImmutableList.of(ImmutableList.of()))
+ .withCoder(actual.getCoder()));
+ return PCollectionList.of(dummy)
+ .and(actual)
+ .apply(Flatten.pCollections())
+ .apply(
+ // Default end-of-window trigger dissallowed for unbounded
PCollections.
Review comment:
```suggestion
// Default end-of-window trigger disallowed for unbounded
PCollections.
```
Nit: spelling
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
##########
@@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows
rewindowingStrategy) {
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T>
input) {
final int combinedKey = 42;
+ if
(input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())
+ && rewindowingStrategy instanceof IntoGlobalWindow) {
+ // If we don't have to worry about complicated triggering semantics we
can generate
+ // a much simpler pipeline. This is particularly useful for
bootstrapping runners so that
+ // we can run subsets of the validates runner test suite requiring
support of only the
+ // most basic primitives.
+
+ // In order to ensure we actually get an (empty) iterable rather than
an empty PCollection
+ // when the input is an empty PCollection, we flatten with a dummy
PCollection containing
+ // an empty iterable before grouping on a singleton key and
concatenating.
+ PCollection<Iterable<ValueInSingleWindow<T>>> actual =
+ input.apply(Reify.windows()).apply(ParDo.of(new
ToSingletonIterables<>()));
+ PCollection<Iterable<ValueInSingleWindow<T>>> dummy =
+ input
+ .getPipeline()
+ .apply(
+ Create.<Iterable<ValueInSingleWindow<T>>>of(
+ ImmutableList.of(ImmutableList.of()))
+ .withCoder(actual.getCoder()));
+ return PCollectionList.of(dummy)
+ .and(actual)
+ .apply(Flatten.pCollections())
+ .apply(
+ // Default end-of-window trigger dissallowed for unbounded
PCollections.
Review comment:
Why does this only apply to unbounded PCollections? It looks like the
existing translation removes triggers for both bounded and unbounded.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]