[ 
https://issues.apache.org/jira/browse/BEAM-5931?focusedWorklogId=164316&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164316
 ]

ASF GitHub Bot logged work on BEAM-5931:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/18 11:23
            Start Date: 09/Nov/18 11:23
    Worklog Time Spent: 10m 
      Work Description: mxm closed pull request #6990: [BEAM-5931]Revert 
"Revert "[BEAM-5299] Define max timestamp for global window in…
URL: https://github.com/apache/beam/pull/6990
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 194797f166c..4b537b0c4f6 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -32,6 +32,22 @@ option java_outer_classname = "RunnerApi";
 import "google/protobuf/any.proto";
 import "google/protobuf/descriptor.proto";
 
+message BeamConstants {
+  enum Constants {
+    // All timestamps in milliseconds since Jan 1, 1970.
+    MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"];
+    MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) =  "9223372036854775"];
+    // The maximum timestamp for the global window.
+    // Triggers use maxTimestamp to set timers' timestamp. Timers fires when
+    // the watermark passes their timestamps. So, the timestamp needs to be
+    // smaller than the MAX_TIMESTAMP_MILLIS.
+    // One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure
+    // the maxTimestamp is smaller than MAX_TIMESTAMP_MILLIS even after 
rounding up
+    // to seconds or minutes. See also GlobalWindow in the Java SDK.
+    GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = 
"9223371950454775"];
+  }
+}
+
 // A set of mappings from id to message. This is included as an optional field
 // on any proto message that may contain references needing resolution.
 message Components {
@@ -1065,6 +1081,8 @@ extend google.protobuf.EnumValueOptions {
   //   }
   // }
   string beam_urn = 185324356;
+  // A value to store other constants
+  string beam_constant = 185324357;
 }
 
 // A URN along with a parameter object whose schema is determined by the
diff --git a/sdks/go/pkg/beam/core/graph/mtime/time.go 
b/sdks/go/pkg/beam/core/graph/mtime/time.go
index 627deb9c075..7ef82fa9b30 100644
--- a/sdks/go/pkg/beam/core/graph/mtime/time.go
+++ b/sdks/go/pkg/beam/core/graph/mtime/time.go
@@ -37,6 +37,7 @@ const (
 
        // EndOfGlobalWindowTime is the timestamp at the end of the global 
window. It
        // is a day before the max timestamp.
+       // TODO Use GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS from the Runner API 
constants
        EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000
 
        // ZeroTimestamp is the default zero value time. It corresponds to the 
unix epoch.
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 2b976a285ef..61c4d66956b 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -51,6 +51,8 @@ test {
 }
 
 dependencies {
+  // Required to load constants from the model, e.g. max timestamp for global 
window
+  shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   compile library.java.guava
   compile library.java.protobuf_java
   compile library.java.byte_buddy
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index eb9ae7fee29..1d6b13c49dd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.joda.time.Instant;
 
 /**
@@ -46,7 +46,7 @@
    * microseconds-since-epoch can be safely represented with a {@code long}.
    */
   public static final Instant TIMESTAMP_MIN_VALUE =
-      new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+      
extractTimestampFromProto(RunnerApi.BeamConstants.Constants.MIN_TIMESTAMP_MILLIS);
 
   /**
    * The maximum value for any Beam timestamp. Often referred to as 
"+infinity".
@@ -55,7 +55,7 @@
    * microseconds-since-epoch can be safely represented with a {@code long}.
    */
   public static final Instant TIMESTAMP_MAX_VALUE =
-      new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+      
extractTimestampFromProto(RunnerApi.BeamConstants.Constants.MAX_TIMESTAMP_MILLIS);
 
   /**
    * Formats a {@link Instant} timestamp with additional Beam-specific 
metadata, such as indicating
@@ -76,4 +76,11 @@ public static String formatTimestamp(Instant timestamp) {
 
   /** Returns the inclusive upper bound of timestamps for values in this 
window. */
   public abstract Instant maxTimestamp();
+
+  /** Parses a timestamp from the proto. */
+  private static Instant 
extractTimestampFromProto(RunnerApi.BeamConstants.Constants constant) {
+    return new Instant(
+        Long.parseLong(
+            
constant.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant)));
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 669365ba150..2bae2dc38eb 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -21,8 +21,8 @@
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.sdk.coders.StructuredCoder;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** The default window into which all data is placed (via {@link 
GlobalWindows}). */
@@ -36,8 +36,7 @@
   // One standard day is subtracted from TIMESTAMP_MAX_VALUE to make sure
   // the maxTimestamp is smaller than TIMESTAMP_MAX_VALUE even after rounding 
up
   // to seconds or minutes.
-  private static final Instant END_OF_GLOBAL_WINDOW =
-      TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1));
+  private static final Instant END_OF_GLOBAL_WINDOW = 
extractMaxTimestampFromProto();
 
   @Override
   public Instant maxTimestamp() {
@@ -83,4 +82,14 @@ public boolean consistentWithEquals() {
 
     private Coder() {}
   }
+
+  /** Parses the max timestamp for global windows from the proto. */
+  private static Instant extractMaxTimestampFromProto() {
+    return new Instant(
+        Long.parseLong(
+            
RunnerApi.BeamConstants.Constants.GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS
+                .getValueDescriptor()
+                .getOptions()
+                .getExtension(RunnerApi.beamConstant)));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 413901da1d2..856e68ee875 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -46,6 +46,7 @@ public static ApiSurface getSdkApiSurface(final ClassLoader 
classLoader) throws
         
.pruningClassName("org.apache.beam.sdk.testing.InterceptingUrlClassLoader")
         // test only
         .pruningPrefix("org.apache.beam.model.")
+        .pruningPrefix("org.apache.beam.vendor.")
         .pruningPrefix("java");
   }
 
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 425137c8634..5ce7f9d0c5a 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -51,6 +51,7 @@
 from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.display import HasDisplayData
+from apache_beam.utils import timestamp
 from apache_beam.utils import urns
 from apache_beam.utils.windowed_value import WindowedValue
 
@@ -1052,7 +1053,7 @@ def process(self, element, init_result):
     writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
     for e in bundle[1]:  # values
       writer.write(e)
-    return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
+    return [window.TimestampedValue(writer.close(), timestamp.MAX_TIMESTAMP)]
 
 
 def _pre_finalize(unused_element, sink, init_result, write_results):
@@ -1072,7 +1073,8 @@ def _finalize_write(unused_element, sink, init_result, 
write_results,
   outputs = sink.finalize_write(init_result, write_results + extra_shards,
                                 pre_finalize_results)
   if outputs:
-    return (window.TimestampedValue(v, window.MAX_TIMESTAMP) for v in outputs)
+    return (
+        window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
 
 
 class _RoundRobinKeyFn(core.DoFn):
diff --git a/sdks/python/apache_beam/portability/common_urns.py 
b/sdks/python/apache_beam/portability/common_urns.py
index 793bfcbf5df..4ee32a7e31a 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -30,6 +30,9 @@ class PropertiesFromEnumValue(object):
   def __init__(self, value_descriptor):
     self.urn = (
         value_descriptor.GetOptions().Extensions[beam_runner_api_pb2.beam_urn])
+    self.constant = (
+        value_descriptor.GetOptions().Extensions[
+            beam_runner_api_pb2.beam_constant])
 
 
 class PropertiesFromEnumType(object):
@@ -52,6 +55,9 @@ def __init__(self, enum_type):
 
 coders = PropertiesFromEnumType(beam_runner_api_pb2.StandardCoders.Enum)
 
+constants = PropertiesFromEnumType(
+    beam_runner_api_pb2.BeamConstants.Constants)
+
 environments = PropertiesFromEnumType(
     beam_runner_api_pb2.StandardEnvironments.Environments)
 
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index e70c8ef4053..0970a28d685 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -66,7 +66,6 @@
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
-from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.timestamp import Timestamp
@@ -295,11 +294,6 @@ def __lt__(self, other):
 class GlobalWindow(BoundedWindow):
   """The default window into which all data is placed (via GlobalWindows)."""
   _instance = None
-  # The maximum timestamp for global windows is MAX_TIMESTAMP - 1 day.
-  # This is due to timers triggering when the watermark passes the trigger
-  # time, which is only possible for timestamps < MAX_TIMESTAMP.
-  # See also GlobalWindow in the Java SDK.
-  _END_OF_GLOBAL_WINDOW = MAX_TIMESTAMP - (24 * 60 * 60)
 
   def __new__(cls):
     if cls._instance is None:
@@ -307,7 +301,7 @@ def __new__(cls):
     return cls._instance
 
   def __init__(self):
-    super(GlobalWindow, self).__init__(GlobalWindow._END_OF_GLOBAL_WINDOW)
+    super(GlobalWindow, self).__init__(GlobalWindow._getTimestampFromProto())
     self.start = MIN_TIMESTAMP
 
   def __repr__(self):
@@ -323,6 +317,12 @@ def __eq__(self, other):
   def __ne__(self, other):
     return not self == other
 
+  @staticmethod
+  def _getTimestampFromProto():
+    ts_millis = int(
+        common_urns.constants.GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS.constant)
+    return Timestamp(micros=ts_millis*1000)
+
 
 class NonMergingWindowFn(WindowFn):
 
diff --git a/sdks/python/apache_beam/utils/timestamp.py 
b/sdks/python/apache_beam/utils/timestamp.py
index cdd633e3388..2ed775c8567 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -31,6 +31,8 @@
 import pytz
 from past.builtins import long
 
+from apache_beam.portability import common_urns
+
 
 @functools.total_ordering
 class Timestamp(object):
@@ -181,8 +183,10 @@ def __mod__(self, other):
     return Duration(micros=self.micros % other.micros)
 
 
-MIN_TIMESTAMP = Timestamp(micros=-0x7fffffffffffffff - 1)
-MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff)
+MIN_TIMESTAMP = Timestamp(micros=int(
+    common_urns.constants.MIN_TIMESTAMP_MILLIS.constant)*1000)
+MAX_TIMESTAMP = Timestamp(micros=int(
+    common_urns.constants.MAX_TIMESTAMP_MILLIS.constant)*1000)
 
 
 @functools.total_ordering


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 164316)
    Time Spent: 8h 40m  (was: 8.5h)

> Rollback PR/6899
> ----------------
>
>                 Key: BEAM-5931
>                 URL: https://issues.apache.org/jira/browse/BEAM-5931
>             Project: Beam
>          Issue Type: Task
>          Components: beam-model, runner-dataflow
>            Reporter: Luke Cwik
>            Assignee: Ruoyun Huang
>            Priority: Major
>          Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> To rollback this change, one must either:
> 1) Update nexmark / perf test framework to use Dataflow worker jar.
> This requires adding the 
> {code}
>      "--dataflowWorkerJar=${dataflowWorkerJar}",
>      "--workerHarnessContainerImage=",
> {code}
> when running the tests.
> OR
> 2) Update the dataflow worker image with code that contains the rollback of 
> PR/6899 and then rollback PR/6899 in Github with the updated Dataflow worker 
> image.
> #1 is preferable since we will no longer have tests running that don't use a 
> Dataflow worker jar built from Github HEAD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to