Repository: beam Updated Branches: refs/heads/master f870bf516 -> 8bd647596
Use dehydration-insensitive APIs in WindowEvaluatorFactory Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e38dc5fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e38dc5fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e38dc5fb Branch: refs/heads/master Commit: e38dc5fbe1f45fe282b4cd96b858631db73130e9 Parents: b6f126d Author: Kenneth Knowles <k...@google.com> Authored: Wed Jun 7 13:58:11 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jul 21 11:36:21 2017 -0700 ---------------------------------------------------------------------- .../construction/WindowIntoTranslation.java | 46 +++++++++-- .../WindowingStrategyTranslation.java | 83 ++++++++++++-------- .../construction/WindowIntoTranslationTest.java | 2 +- .../runners/direct/WindowEvaluatorFactory.java | 5 +- 4 files changed, 95 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index aa17bc9..6aec908 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -18,15 +18,17 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.auto.service.AutoService; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -64,10 +66,44 @@ public class WindowIntoTranslation { .build(); } - public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload) - throws InvalidProtocolBufferException { - SdkFunctionSpec spec = payload.getWindowFn(); - return WindowingStrategyTranslation.windowFnFromProto(spec); + public static WindowIntoPayload getWindowIntoPayload(AppliedPTransform<?, ?, ?> application) { + RunnerApi.PTransform transformProto; + try { + transformProto = + PTransformTranslation.toProto( + application, + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + SdkComponents.create()); + } catch (IOException exc) { + throw new RuntimeException(exc); + } + + checkArgument( + PTransformTranslation.WINDOW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()), + "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"", + Window.Assign.class.getSimpleName(), + application.getTransform(), + application.getFullName(), + transformProto.getSpec().getUrn()); + + WindowIntoPayload windowIntoPayload; + try { + return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class); + } catch (InvalidProtocolBufferException exc) { + throw new IllegalStateException( + String.format( + "%s translated %s with URN '%s' but payload was not a %s", + PTransformTranslation.class.getSimpleName(), + application, + PTransformTranslation.WINDOW_TRANSFORM_URN, + WindowIntoPayload.class.getSimpleName()), + exc); + } + } + + public static WindowFn<?, ?> getWindowFn(AppliedPTransform<?, ?, ?> application) { + return WindowingStrategyTranslation.windowFnFromProto( + getWindowIntoPayload(application).getWindowFn()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 1456a3f..046153d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -357,40 +357,55 @@ public class WindowingStrategyTranslation implements Serializable { .withOnTimeBehavior(onTimeBehavior); } - public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) - throws InvalidProtocolBufferException { - switch (windowFnSpec.getSpec().getUrn()) { - case GLOBAL_WINDOWS_FN: - return new GlobalWindows(); - case FIXED_WINDOWS_FN: - StandardWindowFns.FixedWindowsPayload fixedParams = - windowFnSpec.getSpec().getParameter().unpack( - StandardWindowFns.FixedWindowsPayload.class); - return FixedWindows.of( - Duration.millis(Durations.toMillis(fixedParams.getSize()))) - .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset()))); - case SLIDING_WINDOWS_FN: - StandardWindowFns.SlidingWindowsPayload slidingParams = - windowFnSpec.getSpec().getParameter().unpack( - StandardWindowFns.SlidingWindowsPayload.class); - return SlidingWindows.of( - Duration.millis(Durations.toMillis(slidingParams.getSize()))) - .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod()))) - .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset()))); - case SESSION_WINDOWS_FN: - StandardWindowFns.SessionsPayload sessionParams = - windowFnSpec.getSpec().getParameter().unpack( - StandardWindowFns.SessionsPayload.class); - return Sessions.withGapDuration( - Duration.millis(Durations.toMillis(sessionParams.getGapSize()))); - case SERIALIZED_JAVA_WINDOWFN_URN: - case OLD_SERIALIZED_JAVA_WINDOWFN_URN: - return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray( - windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), - "WindowFn"); - default: - throw new IllegalArgumentException( - "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn()); + public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) { + try { + switch (windowFnSpec.getSpec().getUrn()) { + case GLOBAL_WINDOWS_FN: + return new GlobalWindows(); + case FIXED_WINDOWS_FN: + StandardWindowFns.FixedWindowsPayload fixedParams = + windowFnSpec + .getSpec() + .getParameter() + .unpack(StandardWindowFns.FixedWindowsPayload.class); + return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize()))) + .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset()))); + case SLIDING_WINDOWS_FN: + StandardWindowFns.SlidingWindowsPayload slidingParams = + windowFnSpec + .getSpec() + .getParameter() + .unpack(StandardWindowFns.SlidingWindowsPayload.class); + return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize()))) + .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod()))) + .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset()))); + case SESSION_WINDOWS_FN: + StandardWindowFns.SessionsPayload sessionParams = + windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class); + return Sessions.withGapDuration( + Duration.millis(Durations.toMillis(sessionParams.getGapSize()))); + case SERIALIZED_JAVA_WINDOWFN_URN: + case OLD_SERIALIZED_JAVA_WINDOWFN_URN: + return (WindowFn<?, ?>) + SerializableUtils.deserializeFromByteArray( + windowFnSpec + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "WindowFn"); + default: + throw new IllegalArgumentException( + "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn()); + } + } catch (InvalidProtocolBufferException exc) { + throw new IllegalArgumentException( + String.format( + "%s for %s with URN %s did not contain expected proto message for payload", + FunctionSpec.class.getSimpleName(), + WindowFn.class.getSimpleName(), + windowFnSpec.getSpec().getUrn())); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java index cb9617a..3ba2d6f 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java @@ -95,7 +95,7 @@ public class WindowIntoTranslationTest { WindowIntoPayload payload = WindowIntoTranslation.toProto(assign.get().getTransform(), components); - assertEquals(windowFn, WindowIntoTranslation.getWindowFn(payload)); + assertEquals(windowFn, WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn())); } private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> { http://git-wip-us.apache.org/repos/asf/beam/blob/e38dc5fb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index f4228d9..2d37b27 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.Collection; import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.WindowIntoTranslation; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -52,7 +53,9 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { private <InputT> TransformEvaluator<InputT> createTransformEvaluator( AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>> transform) { - WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn(); + + WindowFn<? super InputT, ?> fn = (WindowFn) WindowIntoTranslation.getWindowFn(transform); + UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle( (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));