[ https://issues.apache.org/jira/browse/BEAM-5931?focusedWorklogId=164316&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164316 ]
ASF GitHub Bot logged work on BEAM-5931: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Nov/18 11:23 Start Date: 09/Nov/18 11:23 Worklog Time Spent: 10m Work Description: mxm closed pull request #6990: [BEAM-5931]Revert "Revert "[BEAM-5299] Define max timestamp for global window in… URL: https://github.com/apache/beam/pull/6990 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 194797f166c..4b537b0c4f6 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -32,6 +32,22 @@ option java_outer_classname = "RunnerApi"; import "google/protobuf/any.proto"; import "google/protobuf/descriptor.proto"; +message BeamConstants { + enum Constants { + // All timestamps in milliseconds since Jan 1, 1970. + MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"]; + MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) = "9223372036854775"]; + // The maximum timestamp for the global window. + // Triggers use maxTimestamp to set timers' timestamp. Timers fires when + // the watermark passes their timestamps. So, the timestamp needs to be + // smaller than the MAX_TIMESTAMP_MILLIS. + // One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure + // the maxTimestamp is smaller than MAX_TIMESTAMP_MILLIS even after rounding up + // to seconds or minutes. See also GlobalWindow in the Java SDK. + GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = "9223371950454775"]; + } +} + // A set of mappings from id to message. This is included as an optional field // on any proto message that may contain references needing resolution. message Components { @@ -1065,6 +1081,8 @@ extend google.protobuf.EnumValueOptions { // } // } string beam_urn = 185324356; + // A value to store other constants + string beam_constant = 185324357; } // A URN along with a parameter object whose schema is determined by the diff --git a/sdks/go/pkg/beam/core/graph/mtime/time.go b/sdks/go/pkg/beam/core/graph/mtime/time.go index 627deb9c075..7ef82fa9b30 100644 --- a/sdks/go/pkg/beam/core/graph/mtime/time.go +++ b/sdks/go/pkg/beam/core/graph/mtime/time.go @@ -37,6 +37,7 @@ const ( // EndOfGlobalWindowTime is the timestamp at the end of the global window. It // is a day before the max timestamp. + // TODO Use GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS from the Runner API constants EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000 // ZeroTimestamp is the default zero value time. It corresponds to the unix epoch. diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 2b976a285ef..61c4d66956b 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -51,6 +51,8 @@ test { } dependencies { + // Required to load constants from the model, e.g. max timestamp for global window + shadow project(path: ":beam-model-pipeline", configuration: "shadow") compile library.java.guava compile library.java.protobuf_java compile library.java.byte_buddy diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index eb9ae7fee29..1d6b13c49dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms.windowing; -import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.joda.time.Instant; /** @@ -46,7 +46,7 @@ * microseconds-since-epoch can be safely represented with a {@code long}. */ public static final Instant TIMESTAMP_MIN_VALUE = - new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + extractTimestampFromProto(RunnerApi.BeamConstants.Constants.MIN_TIMESTAMP_MILLIS); /** * The maximum value for any Beam timestamp. Often referred to as "+infinity". @@ -55,7 +55,7 @@ * microseconds-since-epoch can be safely represented with a {@code long}. */ public static final Instant TIMESTAMP_MAX_VALUE = - new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + extractTimestampFromProto(RunnerApi.BeamConstants.Constants.MAX_TIMESTAMP_MILLIS); /** * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating @@ -76,4 +76,11 @@ public static String formatTimestamp(Instant timestamp) { /** Returns the inclusive upper bound of timestamps for values in this window. */ public abstract Instant maxTimestamp(); + + /** Parses a timestamp from the proto. */ + private static Instant extractTimestampFromProto(RunnerApi.BeamConstants.Constants constant) { + return new Instant( + Long.parseLong( + constant.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant))); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index 669365ba150..2bae2dc38eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -21,8 +21,8 @@ import java.io.OutputStream; import java.util.Collections; import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.coders.StructuredCoder; -import org.joda.time.Duration; import org.joda.time.Instant; /** The default window into which all data is placed (via {@link GlobalWindows}). */ @@ -36,8 +36,7 @@ // One standard day is subtracted from TIMESTAMP_MAX_VALUE to make sure // the maxTimestamp is smaller than TIMESTAMP_MAX_VALUE even after rounding up // to seconds or minutes. - private static final Instant END_OF_GLOBAL_WINDOW = - TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)); + private static final Instant END_OF_GLOBAL_WINDOW = extractMaxTimestampFromProto(); @Override public Instant maxTimestamp() { @@ -83,4 +82,14 @@ public boolean consistentWithEquals() { private Coder() {} } + + /** Parses the max timestamp for global windows from the proto. */ + private static Instant extractMaxTimestampFromProto() { + return new Instant( + Long.parseLong( + RunnerApi.BeamConstants.Constants.GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamConstant))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index 413901da1d2..856e68ee875 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -46,6 +46,7 @@ public static ApiSurface getSdkApiSurface(final ClassLoader classLoader) throws .pruningClassName("org.apache.beam.sdk.testing.InterceptingUrlClassLoader") // test only .pruningPrefix("org.apache.beam.model.") + .pruningPrefix("org.apache.beam.vendor.") .pruningPrefix("java"); } diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 425137c8634..5ce7f9d0c5a 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -51,6 +51,7 @@ from apache_beam.transforms import window from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData +from apache_beam.utils import timestamp from apache_beam.utils import urns from apache_beam.utils.windowed_value import WindowedValue @@ -1052,7 +1053,7 @@ def process(self, element, init_result): writer = self.sink.open_writer(init_result, str(uuid.uuid4())) for e in bundle[1]: # values writer.write(e) - return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)] + return [window.TimestampedValue(writer.close(), timestamp.MAX_TIMESTAMP)] def _pre_finalize(unused_element, sink, init_result, write_results): @@ -1072,7 +1073,8 @@ def _finalize_write(unused_element, sink, init_result, write_results, outputs = sink.finalize_write(init_result, write_results + extra_shards, pre_finalize_results) if outputs: - return (window.TimestampedValue(v, window.MAX_TIMESTAMP) for v in outputs) + return ( + window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) class _RoundRobinKeyFn(core.DoFn): diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 793bfcbf5df..4ee32a7e31a 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -30,6 +30,9 @@ class PropertiesFromEnumValue(object): def __init__(self, value_descriptor): self.urn = ( value_descriptor.GetOptions().Extensions[beam_runner_api_pb2.beam_urn]) + self.constant = ( + value_descriptor.GetOptions().Extensions[ + beam_runner_api_pb2.beam_constant]) class PropertiesFromEnumType(object): @@ -52,6 +55,9 @@ def __init__(self, enum_type): coders = PropertiesFromEnumType(beam_runner_api_pb2.StandardCoders.Enum) +constants = PropertiesFromEnumType( + beam_runner_api_pb2.BeamConstants.Constants) + environments = PropertiesFromEnumType( beam_runner_api_pb2.StandardEnvironments.Environments) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index e70c8ef4053..0970a28d685 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -66,7 +66,6 @@ from apache_beam.transforms import timeutil from apache_beam.utils import proto_utils from apache_beam.utils import urns -from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import Timestamp @@ -295,11 +294,6 @@ def __lt__(self, other): class GlobalWindow(BoundedWindow): """The default window into which all data is placed (via GlobalWindows).""" _instance = None - # The maximum timestamp for global windows is MAX_TIMESTAMP - 1 day. - # This is due to timers triggering when the watermark passes the trigger - # time, which is only possible for timestamps < MAX_TIMESTAMP. - # See also GlobalWindow in the Java SDK. - _END_OF_GLOBAL_WINDOW = MAX_TIMESTAMP - (24 * 60 * 60) def __new__(cls): if cls._instance is None: @@ -307,7 +301,7 @@ def __new__(cls): return cls._instance def __init__(self): - super(GlobalWindow, self).__init__(GlobalWindow._END_OF_GLOBAL_WINDOW) + super(GlobalWindow, self).__init__(GlobalWindow._getTimestampFromProto()) self.start = MIN_TIMESTAMP def __repr__(self): @@ -323,6 +317,12 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + @staticmethod + def _getTimestampFromProto(): + ts_millis = int( + common_urns.constants.GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS.constant) + return Timestamp(micros=ts_millis*1000) + class NonMergingWindowFn(WindowFn): diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index cdd633e3388..2ed775c8567 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -31,6 +31,8 @@ import pytz from past.builtins import long +from apache_beam.portability import common_urns + @functools.total_ordering class Timestamp(object): @@ -181,8 +183,10 @@ def __mod__(self, other): return Duration(micros=self.micros % other.micros) -MIN_TIMESTAMP = Timestamp(micros=-0x7fffffffffffffff - 1) -MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff) +MIN_TIMESTAMP = Timestamp(micros=int( + common_urns.constants.MIN_TIMESTAMP_MILLIS.constant)*1000) +MAX_TIMESTAMP = Timestamp(micros=int( + common_urns.constants.MAX_TIMESTAMP_MILLIS.constant)*1000) @functools.total_ordering ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 164316) Time Spent: 8h 40m (was: 8.5h) > Rollback PR/6899 > ---------------- > > Key: BEAM-5931 > URL: https://issues.apache.org/jira/browse/BEAM-5931 > Project: Beam > Issue Type: Task > Components: beam-model, runner-dataflow > Reporter: Luke Cwik > Assignee: Ruoyun Huang > Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > To rollback this change, one must either: > 1) Update nexmark / perf test framework to use Dataflow worker jar. > This requires adding the > {code} > "--dataflowWorkerJar=${dataflowWorkerJar}", > "--workerHarnessContainerImage=", > {code} > when running the tests. > OR > 2) Update the dataflow worker image with code that contains the rollback of > PR/6899 and then rollback PR/6899 in Github with the updated Dataflow worker > image. > #1 is preferable since we will no longer have tests running that don't use a > Dataflow worker jar built from Github HEAD. -- This message was sent by Atlassian JIRA (v7.6.3#76005)