[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=392857=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392857 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 25/Feb/20 19:20 Start Date: 25/Feb/20 19:20 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10957: [BEAM-8537] Use NoOpWatermarkEstimator in sdf_direct_runner URL: https://github.com/apache/beam/pull/10957 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392857) Time Spent: 18.5h (was: 18h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 18.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=392856=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392856 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 25/Feb/20 19:20 Start Date: 25/Feb/20 19:20 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10957: [BEAM-8537] Use NoOpWatermarkEstimator in sdf_direct_runner URL: https://github.com/apache/beam/pull/10957#issuecomment-591022562 Thanks for your quick approval! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392856) Time Spent: 18h 20m (was: 18h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 18h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=392198=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392198 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 24/Feb/20 23:50 Start Date: 24/Feb/20 23:50 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10933: [BEAM-8537] Update docstring of ManualWatermarkEstimator.set_watermark() URL: https://github.com/apache/beam/pull/10933 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392198) Time Spent: 18h 10m (was: 18h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 18h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=392195=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392195 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 24/Feb/20 23:50 Start Date: 24/Feb/20 23:50 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10933: [BEAM-8537] Update docstring of ManualWatermarkEstimator.set_watermark() URL: https://github.com/apache/beam/pull/10933#issuecomment-590607584 All tests passed. I'm going to merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392195) Time Spent: 18h (was: 17h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 18h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=390973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390973 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 21/Feb/20 23:40 Start Date: 21/Feb/20 23:40 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #10933: [BEAM-8537] Update docstring of ManualWatermarkEstimator.set_watermark() URL: https://github.com/apache/beam/pull/10933#issuecomment-589882455 Fix python formatting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390973) Time Spent: 17h 50m (was: 17h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 17h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=390964=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390964 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 21/Feb/20 23:20 Start Date: 21/Feb/20 23:20 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10933: [BEAM-8537] Update docstring of ManualWatermarkEstimator.set_watermark() URL: https://github.com/apache/beam/pull/10933 Follow up with Luke's comment: https://github.com/apache/beam/pull/10375#discussion_r382727370 R: @lukecwik Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=390857=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390857 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 21/Feb/20 19:18 Start Date: 21/Feb/20 19:18 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390857) Time Spent: 17.5h (was: 17h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 17.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=390824=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390824 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 21/Feb/20 18:11 Start Date: 21/Feb/20 18:11 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-589769206 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390824) Time Spent: 17h 20m (was: 17h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 17h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=390820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390820 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 21/Feb/20 18:10 Start Date: 21/Feb/20 18:10 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r382723627 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.transforms.core import WatermarkEstimatorProvider +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + # TODO(BEAM-9312): Consider making it configurable to deal with late + # timestamp. + if timestamp < self._watermark: +raise ValueError( +'A MonotonicWatermarkEstimator expects output ' +'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + @staticmethod + def default_provider(): +"""Provide a default WatermarkEstimatorProvider for +MonotonicWatermarkEstimator. +""" +class DefaultMonotonicWatermarkEstimator(WatermarkEstimatorProvider): + def initial_estimator_state(self, element, restriction): +return None + + def create_watermark_estimator(self, estimator_state): +return MonotonicWatermarkEstimator(estimator_state) + +return DefaultMonotonicWatermarkEstimator() + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which uses processing time as the estimated watermark. + """ + def __init__(self, timestamp=None): +self._timestamp = timestamp or Timestamp.now() + + def observe_timestamp(self, timestamp): +pass + + def current_watermark(self): +self._timestamp = max(self._timestamp, Timestamp.now()) +return self._timestamp + + def get_estimator_state(self): +return self._timestamp + + @staticmethod + def default_provider(): +"""Provide a default WatermarkEstimatorProvider for +WalltimeWatermarkEstimator. +""" +class DefaultWalltimeWatermarkEstimator(WatermarkEstimatorProvider): + def initial_estimator_state(self, element, restriction): +return None + + def create_watermark_estimator(self, estimator_state): +return WalltimeWatermarkEstimator(estimator_state) + +return DefaultWalltimeWatermarkEstimator() + + +class ManualWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which is controlled manually from within a DoFn. + + The DoFn must invoke set_watermark to advance the watermark. + """ + def __init__(self, watermark): +self._watermark = watermark + + def observe_timestamp(self, timestamp): +pass + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + def set_watermark(self, timestamp): +# Please call set_watermark after calling restriction_tracker.try_claim() to +# prevent advancing watermark early. +# TODO(BEAM-7473): It's possible that getting a slightly stale watermark Review comment: I would drop the TODO and close the
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=389652=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389652 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 19/Feb/20 21:49 Start Date: 19/Feb/20 21:49 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-588489213 Kindly pinging : ) If everything looks fine, I'll squash them into one commit before merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389652) Time Spent: 16h 50m (was: 16h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387624 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 19:15 Start Date: 14/Feb/20 19:15 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-586429919 @robertwb and @lukecwik, the PR is updated based on our discussion yesterday. PTAL : ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387624) Time Spent: 16h 40m (was: 16.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387617=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387617 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 19:08 Start Date: 14/Feb/20 19:08 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379596910 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -1035,6 +1284,8 @@ def process_outputs(self, windowed_input_element, results): windowed_value.windows *= len(windowed_input_element.windows) else: windowed_value = windowed_input_element.with_value(result) + if watermark_estimator: Review comment: The checking is needed for non-sdf dofn since we only have 'watermark_estimator` in SDF. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387617) Time Spent: 16.5h (was: 16h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387082 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 03:05 Start Date: 14/Feb/20 03:05 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-586075968 Run PythonFormatter PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387082) Time Spent: 16h 20m (was: 16h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387070 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 02:44 Start Date: 14/Feb/20 02:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379226662 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -765,40 +802,54 @@ def _invoke_process_per_window(self, # ProcessSizedElementAndRestriction. self.threadsafe_restriction_tracker.check_done() deferred_status = self.threadsafe_restriction_tracker.deferred_status() - current_watermark = None - if self.watermark_estimator: -current_watermark = self.watermark_estimator.current_watermark() if deferred_status: deferred_restriction, deferred_timestamp = deferred_status element = windowed_value.value size = self.signature.get_restriction_provider().restriction_size( element, deferred_restriction) -residual_value = ((element, deferred_restriction), size) -return SplitResultResidual( -residual_value=windowed_value.with_value(residual_value), -current_watermark=current_watermark, -deferred_timestamp=deferred_timestamp) -return None +if self.threadsafe_watermark_estimator: + current_watermark = ( + self.threadsafe_watermark_estimator.current_watermark()) + estimator_state = ( + self.threadsafe_watermark_estimator.get_estimator_state()) + residual_value = ( + (element, (deferred_restriction, estimator_state)), size) + return SplitResultResidual( + residual_value=windowed_value.with_value(residual_value), + current_watermark=current_watermark, + deferred_timestamp=deferred_timestamp) +else: Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387070) Time Spent: 16h (was: 15h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387071=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387071 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 02:44 Start Date: 14/Feb/20 02:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379226674 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -321,6 +333,21 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a +tuple coder of (restriction_coder, estimator_state_coder). Otherwise, +returns the SDFs restriction_coder. +""" +restriction_coder = None Review comment: Changed to `if ... else` block. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387071) Time Spent: 16h 10m (was: 16h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 16h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387068 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 02:44 Start Date: 14/Feb/20 02:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379226640 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which uses processing time as the estimated watermark. + """ + def __init__(self, timestamp=None): +if timestamp: Review comment: Done. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387068) Time Spent: 15h 40m (was: 15.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387069 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 02:44 Start Date: 14/Feb/20 02:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379226651 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ## @@ -486,19 +486,28 @@ def process( _ = (p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn())) def test_sdf_with_watermark_tracking(self): +class ManualWatermarkEstimatorProvider( Review comment: Yes we should! Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387069) Time Spent: 15h 50m (was: 15h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=387024=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387024 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Feb/20 01:48 Start Date: 14/Feb/20 01:48 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-586058417 Re: https://github.com/apache/beam/pull/10375#discussion_r379142182 We still need to check whether `watermark_estimator` is None in the `_OutputProcessor.process_output().` because there are other non-sdf dofns calling this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387024) Time Spent: 15.5h (was: 15h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386964=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386964 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379142182 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -1035,6 +1284,8 @@ def process_outputs(self, windowed_input_element, results): windowed_value.windows *= len(windowed_input_element.windows) else: windowed_value = windowed_input_element.with_value(result) + if watermark_estimator: Review comment: Fix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386964) Time Spent: 15h (was: 14h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386967=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386967 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379184461 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which uses processing time as the estimated watermark. + """ + def __init__(self, timestamp=None): +if timestamp: Review comment: FWIW, This can be written `self._timestamp = timestamp or Timestamp.now()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386967) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386969 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379142973 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ## @@ -486,19 +486,28 @@ def process( _ = (p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn())) def test_sdf_with_watermark_tracking(self): +class ManualWatermarkEstimatorProvider( Review comment: Hmm... should we provide these as well? (Perhaps via a static default_provider() method on the corresponding watermark estimators?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386969) Time Spent: 15h 20m (was: 15h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386968=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386968 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379141998 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -765,40 +802,54 @@ def _invoke_process_per_window(self, # ProcessSizedElementAndRestriction. self.threadsafe_restriction_tracker.check_done() deferred_status = self.threadsafe_restriction_tracker.deferred_status() - current_watermark = None - if self.watermark_estimator: -current_watermark = self.watermark_estimator.current_watermark() if deferred_status: deferred_restriction, deferred_timestamp = deferred_status element = windowed_value.value size = self.signature.get_restriction_provider().restriction_size( element, deferred_restriction) -residual_value = ((element, deferred_restriction), size) -return SplitResultResidual( -residual_value=windowed_value.with_value(residual_value), -current_watermark=current_watermark, -deferred_timestamp=deferred_timestamp) -return None +if self.threadsafe_watermark_estimator: + current_watermark = ( + self.threadsafe_watermark_estimator.current_watermark()) + estimator_state = ( + self.threadsafe_watermark_estimator.get_estimator_state()) + residual_value = ( + (element, (deferred_restriction, estimator_state)), size) + return SplitResultResidual( + residual_value=windowed_value.with_value(residual_value), + current_watermark=current_watermark, + deferred_timestamp=deferred_timestamp) +else: Review comment: I think this can be removed as self.threadsafe_watermark_estimator will always be something, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386968) Time Spent: 15h 20m (was: 15h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386965 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379138864 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -527,11 +734,11 @@ def __init__(self, signature.is_stateful_dofn()) self.user_state_context = user_state_context self.is_splittable = signature.is_splittable_dofn() -self.watermark_estimator = self.signature.get_watermark_estimator() -self.watermark_estimator_param = ( -self.signature.process_method.watermark_estimator_arg_name -if self.watermark_estimator else None) -self.threadsafe_restriction_tracker = None # type: Optional[iobase.ThreadsafeRestrictionTracker] +self.threadsafe_restriction_tracker = None +self.threadsafe_watermark_estimator = None +# The lock which guarantee synchronization for both +# ThreadsafeRestrictionTracker and ThreadsafeWatermarkEstimator. +self._synchronized_lock = threading.Lock() Review comment: Would an in-person discussion on this be helpful @lukecwik @boyuanzz ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386965) Time Spent: 15h 10m (was: 15h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386963=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386963 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379136355 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -321,6 +333,21 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a +tuple coder of (restriction_coder, estimator_state_coder). Otherwise, +returns the SDFs restriction_coder. +""" +restriction_coder = None Review comment: Nit: https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386963) Time Spent: 15h (was: 14h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386966=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386966 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 23:55 Start Date: 13/Feb/20 23:55 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379136495 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -323,6 +330,27 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def is_tracking_watermark(self): +return self.get_watermark_estimator_provider() is not None + + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a Review comment: This code assumes a watermark estimator is always available, if so update the docstring (and simplify the code elsewhere to not have a different branch for the no-watermark-estimator case). (Yes, this should be feasible for other SDKs as well, but each SDK can do their own thing.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386966) Time Spent: 15h 10m (was: 15h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 15h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386891=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386891 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 21:44 Start Date: 13/Feb/20 21:44 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379135625 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -503,6 +534,182 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): Review comment: It's compiled due to https://github.com/apache/beam/blob/release-2.19.0/sdks/python/setup.py#L263 on install. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386891) Time Spent: 14h 50m (was: 14h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386892 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 21:44 Start Date: 13/Feb/20 21:44 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r379133856 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' Review comment: The problem is that the user *can't* do (2) or (3) themselves. Let's at least add a JIRA to consider making this configurable (either here or as alternative estimators) but not block on this. (1) is certainly the safest default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386892) Time Spent: 14h 50m (was: 14h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386361 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Feb/20 02:31 Start Date: 13/Feb/20 02:31 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-585519066 Hi @robertwb @lukecwik, most comments should be addressed by latest commits, please take another look. Thanks for your help! I don't put `watermark_estimator` into `OutputProcessor` because the `OutputProcessor` is not created per-element, but per-bundle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386361) Time Spent: 14h 40m (was: 14.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=386172=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386172 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 12/Feb/20 19:34 Start Date: 12/Feb/20 19:34 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r378468113 ## File path: sdks/python/apache_beam/io/iobase.py ## @@ -1238,128 +1235,38 @@ def try_claim(self, position): raise NotImplementedError -class ThreadsafeRestrictionTracker(object): Review comment: Finished rebasing on the top of master branch, Filed https://issues.apache.org/jira/browse/BEAM-9296 for adding type check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386172) Time Spent: 14.5h (was: 14h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385470 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 21:28 Start Date: 11/Feb/20 21:28 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584861527 All tests passed. I'm going to merge the PR and work on integrating https://github.com/apache/beam/pull/10375. Thanks, everyone! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385470) Time Spent: 14h 10m (was: 14h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385471 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 21:28 Start Date: 11/Feb/20 21:28 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385471) Time Spent: 14h 20m (was: 14h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385428 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 20:23 Start Date: 11/Feb/20 20:23 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584833264 Run PythonLint PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385428) Time Spent: 14h (was: 13h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 14h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385402 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 19:43 Start Date: 11/Feb/20 19:43 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377858002 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: Create a JIRA here: https://issues.apache.org/jira/browse/BEAM-9296. I'll follow up when finishing API changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385402) Time Spent: 13h 50m (was: 13h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385345 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 18:29 Start Date: 11/Feb/20 18:29 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584782749 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385345) Time Spent: 13h 20m (was: 13h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385346=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385346 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 18:29 Start Date: 11/Feb/20 18:29 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584782789 Run PythonFormatter PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385346) Time Spent: 13.5h (was: 13h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=385347=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-385347 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 18:29 Start Date: 11/Feb/20 18:29 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584782866 Run PythonLint PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 385347) Time Spent: 13h 40m (was: 13.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384947 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 02:46 Start Date: 11/Feb/20 02:46 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584457498 These failed tests actually passes on my local machine. It's highly possible that we have some infra problems with Jenkins. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384947) Time Spent: 13h 10m (was: 13h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384941 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 02:27 Start Date: 11/Feb/20 02:27 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#issuecomment-584453992 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384941) Time Spent: 13h (was: 12h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384906 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 01:24 Start Date: 11/Feb/20 01:24 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377410314 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from typing import TYPE_CHECKING +from typing import Any +from typing import NamedTuple +from typing import Optional +from typing import Tuple +from apache_beam.utils.windowed_value import WindowedValue + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + +SplitResultPrimary = NamedTuple( +'SplitResultPrimary', [('windowed_value', WindowedValue)]) Review comment: Thanks! Like the idea of `primary_value` and `residual_value`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384906) Time Spent: 12h 50m (was: 12h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384861 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 00:19 Start Date: 11/Feb/20 00:19 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377393414 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') Review comment: Nothing comes to mind immediately as to what fields we'd want to add here (though originally even the residual didn't have anything extra). Mostly I like the consistency, so I'd lean towards keeping it as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384861) Time Spent: 12h 40m (was: 12.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384860 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 00:19 Start Date: 11/Feb/20 00:19 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377392592 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: I agree that adding types would be nice, but is probably out of scope. (It would make sense to add types on the base class at the same time.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384860) Time Spent: 12.5h (was: 12h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384858=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384858 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 00:19 Start Date: 11/Feb/20 00:19 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377390532 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from typing import TYPE_CHECKING +from typing import Any +from typing import NamedTuple +from typing import Optional +from typing import Tuple +from apache_beam.utils.windowed_value import WindowedValue + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + +SplitResultPrimary = NamedTuple( +'SplitResultPrimary', [('windowed_value', WindowedValue)]) Review comment: Maybe call this field `primary[_value]` and the other `residual[_value]`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384858) Time Spent: 12.5h (was: 12h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384859 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 11/Feb/20 00:19 Start Date: 11/Feb/20 00:19 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377389487 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -906,26 +906,25 @@ def delayed_bundle_application(self, # type: (...) -> beam_fn_api_pb2.DelayedBundleApplication assert op.input_info is not None # TODO(SDF): For non-root nodes, need main_input_coder + residual_coder. -((element_and_restriction, output_watermark), - deferred_watermark) = deferred_remainder -if deferred_watermark: - assert isinstance(deferred_watermark, timestamp.Duration) +element_and_restriction = deferred_remainder.windowed_value Review comment: You can still use tuple unpacking here, e.g. `(element_and_restriction, deferred_timestamp, current_watermark) = deferred_remainder` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384859) Time Spent: 12.5h (was: 12h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384713=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384713 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 20:37 Start Date: 10/Feb/20 20:37 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377302328 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') Review comment: Changed in https://github.com/apache/beam/pull/10802/commits/8aa9821439fbb941c83c61e34b52aedc1404dacc This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384713) Time Spent: 12h 20m (was: 12h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384695=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384695 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 20:06 Start Date: 10/Feb/20 20:06 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377288081 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestritionTracker`. + + This wrapper guarantees synchronization of modifying restrictions across + multi-thread. + """ + def __init__(self, restriction_tracker): +# type: (RestrictionTracker) -> None +from apache_beam.io.iobase import RestrictionTracker +if not isinstance(restriction_tracker, RestrictionTracker): + raise ValueError( + 'Initialize ThreadsafeRestrictionTracker requires' + 'RestrictionTracker.') +self._restriction_tracker = restriction_tracker +# Records an absolute timestamp when defer_remainder is called. +self._deferred_timestamp = None +self._lock = threading.RLock() +self._deferred_residual = None +self._deferred_watermark = None + + def current_restriction(self): +with self._lock: + return self._restriction_tracker.current_restriction() + + def try_claim(self, position): +with self._lock: + return self._restriction_tracker.try_claim(position) + + def defer_remainder(self, deferred_time=None): +"""Performs self-checkpoint on current processing restriction with an +expected resuming time. + +Self-checkpoint could happen during processing elements. When executing an +DoFn.process(), you may want to stop processing an element and resuming +later if current element has been processed quit a long time or you also +want to have some outputs from other elements. ``defer_remainder()`` can be +called on per element if needed. + +Args: + deferred_time: A relative ``timestamp.Duration`` that indicates the ideal + time gap between now and resuming, or an absolute ``timestamp.Timestamp`` + for resuming execution time. If the time_delay is None, the deferred work + will be executed as soon as possible. +""" + +# Record current time for calculating deferred_time later. +self._deferred_timestamp = timestamp.Timestamp.now() +if (deferred_time and not isinstance(deferred_time, timestamp.Duration) and +not isinstance(deferred_time, timestamp.Timestamp)): + raise ValueError( + 'The timestamp of deter_remainder() should be a ' + 'Duration or a Timestamp, or None.') +self._deferred_watermark = deferred_time +checkpoint = self.try_split(0) +if checkpoint: + _, self._deferred_residual = checkpoint + + def check_done(self): +with self._lock: + return self._restriction_tracker.check_done() + + def current_progress(self): +with self._lock: + return self._restriction_tracker.current_progress() + + def try_split(self, fraction_of_remainder): +with
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384691 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:56 Start Date: 10/Feb/20 19:56 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377283151 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: sounds good. > And I'm curious how can we test the typing? Do we run tox -e mypy? `tox -e py37-mypy` Note that there's an issue with the tox config that's preventing colored output in mypy. So you may want to do (from a python3 venv): ``` pip install mypy cd sdks/python mypy ``` There's also a fair bit of mypy error noise still since not all the PRs have been merged yet, so you'll want to focus on file paths relevant to you change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384691) Time Spent: 12h (was: 11h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384686=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384686 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:45 Start Date: 10/Feb/20 19:45 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377277572 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') Review comment: I'll leave the final call on that up to @robertwb, but my instinct is to use a type alias here, unless there's some need to do tuple operations on this type. I'm not sure of the exact type of windowed_value here, but a type alias would be something like: ```python SplitResultPrimary = WindowedValue ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384686) Time Spent: 11h 40m (was: 11.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384687=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384687 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:45 Start Date: 10/Feb/20 19:45 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377277572 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') Review comment: I'll leave the final call on that up to @robertwb, but my instinct is to use a type alias here, unless there's some need to do tuple operations on this type, or it's likely that it will grow more members in the future. I'm not sure of the exact type of windowed_value here, but a type alias would be something like: ```python SplitResultPrimary = WindowedValue ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384687) Time Spent: 11h 50m (was: 11h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384684 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:43 Start Date: 10/Feb/20 19:43 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377276494 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') Review comment: You can do so safely. `typing` is a requirement for python versions less than 3.8 (and there's a [PR to adjust that to 3.5](https://github.com/apache/beam/pull/10821)). We're already importing typing in numerous places throughout the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384684) Time Spent: 11.5h (was: 11h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384682=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384682 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:42 Start Date: 10/Feb/20 19:42 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377275909 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: Thanks for pointing this out, but I don't think adding type annotations should be in the scope of this PR. The plan is, after this PR checked in, I'll modify https://github.com/apache/beam/pull/10375 and checked in. Then have another PR to do the type stuff. And I'm curious how can we test the typing? Do we run `tox -e mypy`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384682) Time Spent: 11h 10m (was: 11h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384683 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:42 Start Date: 10/Feb/20 19:42 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377275909 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: Thanks for pointing this out, but I don't think adding type annotations should be in the scope of this PR. The plan is, after this PR checked in, I'll modify https://github.com/apache/beam/pull/10375 and checked it in. Then have another PR to do the type stuff. And I'm curious how can we test the typing? Do we run `tox -e mypy`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384683) Time Spent: 11h 20m (was: 11h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384677=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384677 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:38 Start Date: 10/Feb/20 19:38 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377273959 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') Review comment: I'm not sure whether I can use `typing.NamedTuple` directly since it is only supported since py3.5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384677) Time Spent: 10h 50m (was: 10h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 10h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384678 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:38 Start Date: 10/Feb/20 19:38 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377273984 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') Review comment: To keep the similar structure with `SplitResultResidual`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384678) Time Spent: 11h (was: 10h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384671 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:28 Start Date: 10/Feb/20 19:28 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377269238 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestritionTracker`. + + This wrapper guarantees synchronization of modifying restrictions across + multi-thread. + """ + def __init__(self, restriction_tracker): +# type: (RestrictionTracker) -> None +from apache_beam.io.iobase import RestrictionTracker +if not isinstance(restriction_tracker, RestrictionTracker): + raise ValueError( + 'Initialize ThreadsafeRestrictionTracker requires' + 'RestrictionTracker.') +self._restriction_tracker = restriction_tracker +# Records an absolute timestamp when defer_remainder is called. +self._deferred_timestamp = None +self._lock = threading.RLock() +self._deferred_residual = None +self._deferred_watermark = None + + def current_restriction(self): +with self._lock: + return self._restriction_tracker.current_restriction() + + def try_claim(self, position): +with self._lock: + return self._restriction_tracker.try_claim(position) + + def defer_remainder(self, deferred_time=None): +"""Performs self-checkpoint on current processing restriction with an +expected resuming time. + +Self-checkpoint could happen during processing elements. When executing an +DoFn.process(), you may want to stop processing an element and resuming +later if current element has been processed quit a long time or you also +want to have some outputs from other elements. ``defer_remainder()`` can be +called on per element if needed. + +Args: + deferred_time: A relative ``timestamp.Duration`` that indicates the ideal + time gap between now and resuming, or an absolute ``timestamp.Timestamp`` + for resuming execution time. If the time_delay is None, the deferred work + will be executed as soon as possible. +""" + +# Record current time for calculating deferred_time later. +self._deferred_timestamp = timestamp.Timestamp.now() +if (deferred_time and not isinstance(deferred_time, timestamp.Duration) and +not isinstance(deferred_time, timestamp.Timestamp)): + raise ValueError( + 'The timestamp of deter_remainder() should be a ' + 'Duration or a Timestamp, or None.') +self._deferred_watermark = deferred_time +checkpoint = self.try_split(0) +if checkpoint: + _, self._deferred_residual = checkpoint + + def check_done(self): +with self._lock: + return self._restriction_tracker.check_done() + + def current_progress(self): +with self._lock: + return self._restriction_tracker.current_progress() + + def try_split(self, fraction_of_remainder): +with
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384669 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:28 Start Date: 10/Feb/20 19:28 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377269069 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): Review comment: I know that this class is just being copied over from another file, but I'd love to see some type annotations added to it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384669) Time Spent: 10.5h (was: 10h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 10.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384668=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384668 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:27 Start Date: 10/Feb/20 19:27 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377268604 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') + +class ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestritionTracker`. + + This wrapper guarantees synchronization of modifying restrictions across + multi-thread. + """ + def __init__(self, restriction_tracker): +# type: (RestrictionTracker) -> None +from apache_beam.io.iobase import RestrictionTracker +if not isinstance(restriction_tracker, RestrictionTracker): + raise ValueError( + 'Initialize ThreadsafeRestrictionTracker requires' + 'RestrictionTracker.') +self._restriction_tracker = restriction_tracker +# Records an absolute timestamp when defer_remainder is called. +self._deferred_timestamp = None +self._lock = threading.RLock() +self._deferred_residual = None +self._deferred_watermark = None + + def current_restriction(self): +with self._lock: + return self._restriction_tracker.current_restriction() + + def try_claim(self, position): +with self._lock: + return self._restriction_tracker.try_claim(position) + + def defer_remainder(self, deferred_time=None): +"""Performs self-checkpoint on current processing restriction with an +expected resuming time. + +Self-checkpoint could happen during processing elements. When executing an +DoFn.process(), you may want to stop processing an element and resuming +later if current element has been processed quit a long time or you also +want to have some outputs from other elements. ``defer_remainder()`` can be +called on per element if needed. + +Args: + deferred_time: A relative ``timestamp.Duration`` that indicates the ideal + time gap between now and resuming, or an absolute ``timestamp.Timestamp`` + for resuming execution time. If the time_delay is None, the deferred work + will be executed as soon as possible. +""" + +# Record current time for calculating deferred_time later. +self._deferred_timestamp = timestamp.Timestamp.now() +if (deferred_time and not isinstance(deferred_time, timestamp.Duration) and +not isinstance(deferred_time, timestamp.Timestamp)): + raise ValueError( + 'The timestamp of deter_remainder() should be a ' + 'Duration or a Timestamp, or None.') +self._deferred_watermark = deferred_time +checkpoint = self.try_split(0) +if checkpoint: + _, self._deferred_residual = checkpoint + + def check_done(self): +with self._lock: + return self._restriction_tracker.check_done() + + def current_progress(self): +with self._lock: + return self._restriction_tracker.current_progress() + + def try_split(self, fraction_of_remainder): +with
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384667=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384667 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:26 Start Date: 10/Feb/20 19:26 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377268120 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') Review comment: why use a `namedtuple` here if there is only one member? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384667) Time Spent: 10h 10m (was: 10h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=384665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384665 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 10/Feb/20 19:23 Start Date: 10/Feb/20 19:23 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10802: [BEAM-8537] Move wrappers of RestrictionTracker out of iobase URL: https://github.com/apache/beam/pull/10802#discussion_r377266828 ## File path: sdks/python/apache_beam/runners/sdf_utils.py ## @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Common utility class to help SDK harness to execute an SDF. """ + +from __future__ import absolute_import +from __future__ import division + +import logging +import threading +from builtins import object +from collections import namedtuple +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from apache_beam.utils import timestamp + +if TYPE_CHECKING: + from apache_beam.io.iobase import RestrictionTracker + from apache_beam.utils.timestamp import Timestamp + +_LOGGER = logging.getLogger(__name__) + + +SplitResultPrimary = namedtuple( +'SplitResultPrimary', 'windowed_value') + +SplitResultResidual = namedtuple( +'SplitResultResidual', +'windowed_value current_watermark deferred_timestamp') Review comment: please use `typing.NamedTuple` so that we can track the types of members. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384665) Time Spent: 10h (was: 9h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379693 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:55 Start Date: 31/Jan/20 00:55 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373271523 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -648,24 +856,26 @@ def invoke_process(self, raise ValueError( 'A RestrictionTracker %r was provided but DoFn does not have a ' 'RestrictionTrackerParam defined' % restriction_tracker) - from apache_beam.io import iobase - self.threadsafe_restriction_tracker = iobase.ThreadsafeRestrictionTracker( - restriction_tracker) + self.threadsafe_restriction_tracker = _ThreadsafeRestrictionTracker( + restriction_tracker, self._synchronized_lock) additional_kwargs[restriction_tracker_param] = ( - iobase.RestrictionTrackerView(self.threadsafe_restriction_tracker)) - - if self.watermark_estimator: -# The watermark estimator needs to be reset for every element. -self.watermark_estimator.reset() Review comment: With latest changes, the watermark_estimator has been created with estimator_state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379693) Time Spent: 9h 50m (was: 9h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379685 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:26 Start Date: 31/Jan/20 00:26 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373244316 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' Review comment: This is an interesting question. I prefer stopping pipeline by raising error here. I think observing timestamp and emitting late data should be done inside the DoFn boday by the SDF author, given that `MonotonicWatermarkEstimator.observe_timestamp()` is called by the system after DoFn outputting. We can add more docstring to make option1 more explicit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379685) Time Spent: 9h 40m (was: 9.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379683 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:24 Start Date: 31/Jan/20 00:24 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373263710 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -969,7 +1212,10 @@ def _reraise_augmented(self, exn): class OutputProcessor(object): - def process_outputs(self, windowed_input_element, results): + def process_outputs(self, + windowed_input_element, + results, + watermark_estimator=None): Review comment: > Is there value to making this an optional argument or should we make it required? Without providing no-op estimator, for those DoFn without watermark_estimator provided, this argument is optional. If we decided to have no-op estimator, this argument then should be required. > Or, even better, perhaps pass it in as a constructor argument (if it will never change)? The estimator should stay the same for the element. If the OutputProcessor is created per element, then yes we can make it as constructor argument. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379683) Time Spent: 9h 20m (was: 9h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379684 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:24 Start Date: 31/Jan/20 00:24 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373263710 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -969,7 +1212,10 @@ def _reraise_augmented(self, exn): class OutputProcessor(object): - def process_outputs(self, windowed_input_element, results): + def process_outputs(self, + windowed_input_element, + results, + watermark_estimator=None): Review comment: > Is there value to making this an optional argument or should we make it required? Without providing no-op estimator, for those DoFn without watermark_estimator provided, this argument is optional. If we decided to have no-op estimator, this argument then should be required. > Or, even better, perhaps pass it in as a constructor argument (if it will never change)? The estimator should stay the same for the element. If the OutputProcessor is created per element, then yes we can make it as constructor argument. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379684) Time Spent: 9.5h (was: 9h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 9.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379680=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379680 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:18 Start Date: 31/Jan/20 00:18 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373262173 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -765,41 +978,63 @@ def _invoke_process_per_window(self, # ProcessSizedElementAndRestriction. self.threadsafe_restriction_tracker.check_done() deferred_status = self.threadsafe_restriction_tracker.deferred_status() - output_watermark = None - if self.watermark_estimator: -output_watermark = self.watermark_estimator.current_watermark() if deferred_status: deferred_restriction, deferred_watermark = deferred_status element = windowed_value.value size = self.signature.get_restriction_provider().restriction_size( element, deferred_restriction) -return (( -windowed_value.with_value(((element, deferred_restriction), size)), -output_watermark), deferred_watermark) -return None +if self.threadsafe_watermark_estimator: + output_watermark = ( + self.threadsafe_watermark_estimator.current_watermark()) + estimator_state = ( + self.threadsafe_watermark_estimator.get_estimator_state()) + return (( + windowed_value.with_value( + ((element, (deferred_restriction, estimator_state)), size)), + output_watermark), deferred_watermark) +else: + return (( + windowed_value.with_value( + ((element, deferred_restriction), size)), None), + deferred_watermark) +return None def try_split(self, fraction): # type: (...) -> Optional[Tuple[SplitResultType, SplitResultType]] -if self.threadsafe_restriction_tracker and self.current_windowed_value: - # Temporary workaround for [BEAM-7473]: get current_watermark before - # split, in case watermark gets advanced before getting split results. - # In worst case, current_watermark is always stale, which is ok. - if self.watermark_estimator: -current_watermark = self.watermark_estimator.current_watermark() - else: -current_watermark = None - split = self.threadsafe_restriction_tracker.try_split(fraction) +restriction_tracker = self.threadsafe_restriction_tracker +current_windowed_value = self.current_windowed_value +if restriction_tracker and current_windowed_value: + current_watermark = None + # Make sure that the RestrictionTracker and WatermarkEstimator are locked + # together. + with self._synchronized_lock: +split = restriction_tracker.try_split(fraction) +if self.threadsafe_watermark_estimator: + current_watermark = ( + self.threadsafe_watermark_estimator.current_watermark_with_lock()) + estimator_state = (self.threadsafe_watermark_estimator + .get_estimator_state_with_lock()) if split: primary, residual = split element = self.current_windowed_value.value restriction_provider = self.signature.get_restriction_provider() primary_size = restriction_provider.restriction_size(element, primary) residual_size = restriction_provider.restriction_size(element, residual) -return ( -((self.current_windowed_value.with_value(( -(element, primary), primary_size)), None), None), -((self.current_windowed_value.with_value(( -(element, residual), residual_size)), current_watermark), None)) +if self.threadsafe_watermark_estimator: + return ( + ((self.current_windowed_value.with_value(( Review comment: I like the idea of named tuple. I also agree the primary should be different from the residual. If that sounds good, I can make these changes together with moving `ThreadsafeRestrictionTracker` out from `common.py`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379680) Time Spent: 9h 10m (was: 9h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator >
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379678 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 31/Jan/20 00:08 Start Date: 31/Jan/20 00:08 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373259759 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -323,6 +330,27 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def is_tracking_watermark(self): +return self.get_watermark_estimator_provider() is not None + + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a Review comment: Within current changes, we only produce the tuple when we should. E.g., if there is a watermark_estimator, return (element, (restriction, estimator_state)). If there is no watermark_estimator provided, return (element, restriction). So does the restriction_coder. Providing a no-op estimator is a good idea. I'm wondering whether it's also feasible for other SDK, like using `None` as estimator_state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379678) Time Spent: 9h (was: 8h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379656=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379656 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 23:13 Start Date: 30/Jan/20 23:13 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373244316 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' Review comment: This is an interesting question. I prefer stopping pipeline by raising error here. I think observing timestamp and emitting late data should be done inside the DoFn boday by the SDF author, given that `MonotonicWatermarkEstimator.observe_timestamp()` is called by the system. We can add more docstring to make option1 more explicit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379656) Time Spent: 8h 50m (was: 8h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379638 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 22:15 Start Date: 30/Jan/20 22:15 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373223075 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -1035,6 +1284,8 @@ def process_outputs(self, windowed_input_element, results): windowed_value.windows *= len(windowed_input_element.windows) else: windowed_value = windowed_input_element.with_value(result) + if watermark_estimator: Review comment: Thanks for pointing out the performance issue! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379638) Time Spent: 8h 40m (was: 8.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379632=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379632 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 22:03 Start Date: 30/Jan/20 22:03 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373218142 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -503,6 +534,182 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): Review comment: Thanks for pointing out that. I'm curious how `common.py` is compiled and how other modules get involved. If moving these into another helper file meanwhile importing the helper file doesn't increase compiling time, then that's the way to go. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379632) Time Spent: 8.5h (was: 8h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379628=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379628 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:58 Start Date: 30/Jan/20 21:58 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373216098 ## File path: sdks/python/apache_beam/io/iobase.py ## @@ -1238,128 +1235,38 @@ def try_claim(self, position): raise NotImplementedError -class ThreadsafeRestrictionTracker(object): Review comment: Are you suggesting that create a new PR to move existing class like `ThreadsafeRestrictionTracker` and make this PR focus on watermark estimator only? If that's the case, I would prefer letting Chad's [PR](https://github.com/apache/beam/pull/10593) check in first. I can do the refactor for `ThreadsafeRestrictionTracker` related stuff on the top of typing changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379628) Time Spent: 8h 20m (was: 8h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379620 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373202584 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -323,6 +330,27 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def is_tracking_watermark(self): +return self.get_watermark_estimator_provider() is not None + + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a Review comment: On this note, are we correctly producing tuples only when we should? (Or is the restriction coder by default pickle_coder, in which case we might not notice by default?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379620) Time Spent: 8h 10m (was: 8h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379618=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379618 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373204325 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -503,6 +534,182 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): + """A threadsafe wrapper which wraps a WatermarkEstimator with locking + mechanism to guarantee multi-thread safety. + """ + def __init__(self, watermark_estimator, lock): +from apache_beam.io.iobase import WatermarkEstimator +if not isinstance(watermark_estimator, WatermarkEstimator): + raise ValueError('Initializing Threadsafe requires a WatermarkEstimator') +self._watermark_estimator = watermark_estimator +self._lock = lock + + def __getattr__(self, attr): +if hasattr(self._watermark_estimator, attr): + def method_wrapper(*args, **kw): +with self._lock: + return getattr(self._watermark_estimator, attr)(*args, **kw) + return method_wrapper +raise AttributeError(attr) + + def get_estimator_state_with_lock(self): +# The caller should hold the lock before entering this function. +if not self._lock.locked(): + raise RuntimeError('Expected lock to be held to guarantee thread-safe ' + 'access.') +return self._watermark_estimator.get_estimator_state() + + def get_estimator_state(self): +with self._lock: + return self.get_estimator_state_with_lock() + + def current_watermark_with_lock(self): +# The caller should hold the lock before entering this function. +if not self._lock.locked(): + raise RuntimeError('Expected lock to be held to guarantee thread-safe ' + 'access.') +return self._watermark_estimator.current_watermark() + + def current_watermark(self): +with self._lock: + return self.current_watermark_with_lock() + + def observe_timestamp(self, timestamp): +if not isinstance(timestamp, Timestamp): + raise ValueError('Input of observe_timestamp should be a Timestamp ' + 'object') +with self._lock: + self._watermark_estimator.observe_timestamp(timestamp) + + +class _ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestrictionTracker`. + + This wrapper guarantees synchronization of modifying restrictions across + multiple threads. + """ + + def __init__(self, restriction_tracker, lock): +from apache_beam.io.iobase import RestrictionTracker +if not isinstance(restriction_tracker, RestrictionTracker): + raise ValueError( + 'Initialize ThreadsafeRestrictionTracker requires' + 'RestrictionTracker.') +self._restriction_tracker = restriction_tracker +# Records an absolute timestamp when defer_remainder is called. +self._deferred_timestamp = None +self._lock = lock +self._deferred_residual = None +self._deferred_watermark = None + + def current_restriction(self): +with self._lock: + return self._restriction_tracker.current_restriction() + + def try_claim(self, position): +with self._lock: + return self._restriction_tracker.try_claim(position) + + def defer_remainder(self, deferred_time=None): +"""Performs self-checkpoint on current processing restriction with an +expected resuming time. + +Self-checkpoint could happen during processing elements. When executing an +DoFn.process(), you may want to stop processing an element and resuming +later if current element has been processed quit a long time or you also +want to have some outputs from other elements. ``defer_remainder()`` can be +called on per element if needed. + +Args: + deferred_time: A relative ``Duration`` that indicates the ideal time gap + between now and resuming, or an absolute ``Timestamp`` for resuming + execution time. If the time_delay is None, the deferred work will be + executed as soon as possible. +""" + +# Record current time for calculating deferred_time later. +with self._lock: + self._deferred_timestamp = Timestamp.now() + if (deferred_time and + not isinstance(deferred_time, Duration) and + not isinstance(deferred_time, Timestamp)): +raise ValueError('The timestamp of deter_remainder() should be a ' + 'Duration or a Timestamp, or None.') + self._deferred_watermark =
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379613=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379613 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373207576 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +# pytype: skip-file + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new pair, the initial value is None. When +resuming processing, the initial timestamp will be the last reported +watermark. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' Review comment: Here's a conundrum: it might not be very easy for the user to actually catch and act on this error. (For example, say they're reading log entries and they think it's ordered but one occasionally goes backward). There are three options: (1) Stop the pipeline. (2) Suppress the error and emit the item as possibly late data. (3) Move the timestamp forward to respect the watermark. Should this be configurable? Note that option (3) would require a return value here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379613) Time Spent: 7.5h (was: 7h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379608=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379608 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:40 Start Date: 30/Jan/20 21:40 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373200835 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -771,22 +810,33 @@ def try_split(self, fraction): # Temporary workaround for [BEAM-7473]: get current_watermark before # split, in case watermark gets advanced before getting split results. # In worst case, current_watermark is always stale, which is ok. - if self.watermark_estimator: -current_watermark = self.watermark_estimator.current_watermark() - else: -current_watermark = None + current_watermark = None split = restriction_tracker.try_split(fraction) if split: primary, residual = split element = self.current_windowed_value.value restriction_provider = self.signature.get_restriction_provider() primary_size = restriction_provider.restriction_size(element, primary) residual_size = restriction_provider.restriction_size(element, residual) -return ( -((self.current_windowed_value.with_value(( -(element, primary), primary_size)), None), None), -((self.current_windowed_value.with_value(( -(element, residual), residual_size)), current_watermark), None)) +if self.threadsafe_watermark_estimator: + current_watermark = ( + self.threadsafe_watermark_estimator.current_watermark()) + estimator_state = ( + self.threadsafe_watermark_estimator.current_estimator_state()) + return ( + ((self.current_windowed_value.with_value(( + (element, (primary, None)), primary_size)), None), None), Review comment: I think it's fine to re-create it rather than store it--no use caching away the original estimator state right after we create it just to put it here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379608) Time Spent: 7h (was: 6h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379617=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379617 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373205930 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -648,24 +856,26 @@ def invoke_process(self, raise ValueError( 'A RestrictionTracker %r was provided but DoFn does not have a ' 'RestrictionTrackerParam defined' % restriction_tracker) - from apache_beam.io import iobase - self.threadsafe_restriction_tracker = iobase.ThreadsafeRestrictionTracker( - restriction_tracker) + self.threadsafe_restriction_tracker = _ThreadsafeRestrictionTracker( + restriction_tracker, self._synchronized_lock) additional_kwargs[restriction_tracker_param] = ( - iobase.RestrictionTrackerView(self.threadsafe_restriction_tracker)) - - if self.watermark_estimator: -# The watermark estimator needs to be reset for every element. -self.watermark_estimator.reset() Review comment: Shouldn't we be setting it with the watermark estimator state that belongs to this element? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379617) Time Spent: 7h 50m (was: 7h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379610=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379610 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373198094 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -527,11 +734,11 @@ def __init__(self, signature.is_stateful_dofn()) self.user_state_context = user_state_context self.is_splittable = signature.is_splittable_dofn() -self.watermark_estimator = self.signature.get_watermark_estimator() -self.watermark_estimator_param = ( -self.signature.process_method.watermark_estimator_arg_name -if self.watermark_estimator else None) -self.threadsafe_restriction_tracker = None # type: Optional[iobase.ThreadsafeRestrictionTracker] +self.threadsafe_restriction_tracker = None +self.threadsafe_watermark_estimator = None +# The lock which guarantee synchronization for both +# ThreadsafeRestrictionTracker and ThreadsafeWatermarkEstimator. +self._synchronized_lock = threading.Lock() Review comment: IMHO making this lock external to both classes, and having some methods that do locking (exclusive of holding the lock) and others that require holding the lock, introduces complexity that is not worth the benefit (presumably making sure the watermark is never slightly stale). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379610) Time Spent: 7h 20m (was: 7h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379611=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379611 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373196064 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -323,6 +330,27 @@ def is_splittable_dofn(self): # type: () -> bool return self.get_restriction_provider() is not None + def is_tracking_watermark(self): +return self.get_watermark_estimator_provider() is not None + + def get_restriction_coder(self): +"""Get coder for a restriction when processing an SDF. + +If the SDF uses an WatermarkEstimatorProvider, the restriction coder is a Review comment: I wonder if it would be preferable to have a no-op estimator if one is not provided, rather than have if statements here (and elsewhere) to handle two separate cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379611) Time Spent: 7.5h (was: 7h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379619=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379619 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373205181 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -969,7 +1212,10 @@ def _reraise_augmented(self, exn): class OutputProcessor(object): - def process_outputs(self, windowed_input_element, results): + def process_outputs(self, + windowed_input_element, + results, + watermark_estimator=None): Review comment: Or, even better, perhaps pass it in as a constructor argument (if it will never change)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379619) Time Spent: 8h (was: 7h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379616=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379616 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373202880 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -969,7 +1212,10 @@ def _reraise_augmented(self, exn): class OutputProcessor(object): - def process_outputs(self, windowed_input_element, results): + def process_outputs(self, + windowed_input_element, + results, + watermark_estimator=None): Review comment: Is there value to making this an optional argument or should we make it required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379616) Time Spent: 7h 50m (was: 7h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379612=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379612 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373202214 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -765,41 +978,63 @@ def _invoke_process_per_window(self, # ProcessSizedElementAndRestriction. self.threadsafe_restriction_tracker.check_done() deferred_status = self.threadsafe_restriction_tracker.deferred_status() - output_watermark = None - if self.watermark_estimator: -output_watermark = self.watermark_estimator.current_watermark() if deferred_status: deferred_restriction, deferred_watermark = deferred_status element = windowed_value.value size = self.signature.get_restriction_provider().restriction_size( element, deferred_restriction) -return (( -windowed_value.with_value(((element, deferred_restriction), size)), -output_watermark), deferred_watermark) -return None +if self.threadsafe_watermark_estimator: + output_watermark = ( + self.threadsafe_watermark_estimator.current_watermark()) + estimator_state = ( + self.threadsafe_watermark_estimator.get_estimator_state()) + return (( + windowed_value.with_value( + ((element, (deferred_restriction, estimator_state)), size)), + output_watermark), deferred_watermark) +else: + return (( + windowed_value.with_value( + ((element, deferred_restriction), size)), None), + deferred_watermark) +return None def try_split(self, fraction): # type: (...) -> Optional[Tuple[SplitResultType, SplitResultType]] -if self.threadsafe_restriction_tracker and self.current_windowed_value: - # Temporary workaround for [BEAM-7473]: get current_watermark before - # split, in case watermark gets advanced before getting split results. - # In worst case, current_watermark is always stale, which is ok. - if self.watermark_estimator: -current_watermark = self.watermark_estimator.current_watermark() - else: -current_watermark = None - split = self.threadsafe_restriction_tracker.try_split(fraction) +restriction_tracker = self.threadsafe_restriction_tracker +current_windowed_value = self.current_windowed_value +if restriction_tracker and current_windowed_value: + current_watermark = None + # Make sure that the RestrictionTracker and WatermarkEstimator are locked + # together. + with self._synchronized_lock: +split = restriction_tracker.try_split(fraction) +if self.threadsafe_watermark_estimator: + current_watermark = ( + self.threadsafe_watermark_estimator.current_watermark_with_lock()) + estimator_state = (self.threadsafe_watermark_estimator + .get_estimator_state_with_lock()) if split: primary, residual = split element = self.current_windowed_value.value restriction_provider = self.signature.get_restriction_provider() primary_size = restriction_provider.restriction_size(element, primary) residual_size = restriction_provider.restriction_size(element, residual) -return ( -((self.current_windowed_value.with_value(( -(element, primary), primary_size)), None), None), -((self.current_windowed_value.with_value(( -(element, residual), residual_size)), current_watermark), None)) +if self.threadsafe_watermark_estimator: + return ( + ((self.current_windowed_value.with_value(( Review comment: As mentioned in the typing PRs, these tuples of tuples of tuples are getting a bit unwieldy. Perhaps we could introduce a class (or at least named tuple)? Open question: should the primary the same type from the residual, or should it be a different type (given it can hold a watermark and delay time). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379612) Time Spent: 7.5h (was: 7h 20m) > Provide
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379609=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379609 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:40 Start Date: 30/Jan/20 21:40 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373196289 ## File path: sdks/python/apache_beam/io/iobase.py ## @@ -1238,128 +1235,38 @@ def try_claim(self, position): raise NotImplementedError -class ThreadsafeRestrictionTracker(object): Review comment: Could you split out a separate commit that moves these classes from the ones that add new logic? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379609) Time Spent: 7h 10m (was: 7h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379615 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373203216 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -1035,6 +1284,8 @@ def process_outputs(self, windowed_input_element, results): windowed_value.windows *= len(windowed_input_element.windows) else: windowed_value = windowed_input_element.with_value(result) + if watermark_estimator: Review comment: `is not None` (this is performance-sensitive code, as it's called for every value for every transform). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379615) Time Spent: 7h 40m (was: 7.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379614=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379614 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:41 Start Date: 30/Jan/20 21:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373204109 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -503,6 +534,182 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): Review comment: Just a thought: putting all of these in `common.py` requires them to be compiled, and compilation is already a bottleneck (e.g. in testing sometimes we compile it on every worker). Perhaps we could move these to another helper file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379614) Time Spent: 7h 40m (was: 7.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=379607=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379607 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 30/Jan/20 21:40 Start Date: 30/Jan/20 21:40 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r373193475 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') Review comment: I agree, it observes monotonically increasing timestamps and computes, based on those, a watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379607) Time Spent: 7h (was: 6h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371899 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:43 Start Date: 14/Jan/20 21:43 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574387844 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371899) Time Spent: 6.5h (was: 6h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371900 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:43 Start Date: 14/Jan/20 21:43 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574387907 Run PythonLint PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371900) Time Spent: 6h 40m (was: 6.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371901 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:43 Start Date: 14/Jan/20 21:43 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574387984 Run RAT PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371901) Time Spent: 6h 50m (was: 6h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371897 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:42 Start Date: 14/Jan/20 21:42 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574387600 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371897) Time Spent: 6h 10m (was: 6h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371898=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371898 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:42 Start Date: 14/Jan/20 21:42 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574387782 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371898) Time Spent: 6h 20m (was: 6h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371875 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 21:14 Start Date: 14/Jan/20 21:14 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#issuecomment-574376929 Retest all please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371875) Time Spent: 6h (was: 5h 50m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371345 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 04:18 Start Date: 14/Jan/20 04:18 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366146267 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -519,11 +705,11 @@ def __init__(self, signature.is_stateful_dofn()) self.user_state_context = user_state_context self.is_splittable = signature.is_splittable_dofn() -self.watermark_estimator = self.signature.get_watermark_estimator() -self.watermark_estimator_param = ( -self.signature.process_method.watermark_estimator_arg_name -if self.watermark_estimator else None) self.threadsafe_restriction_tracker = None +self.threadsafe_watermark_estimator = None +# The lock which guarantee synchronization for both +# ThreadsafeRestrictionTracker and ThreadsafeWatermarkEstimator. +self._synchronized_lock = threading.RLock() Review comment: Thinking about this again. We don't really need a RLock here. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371345) Time Spent: 5h 50m (was: 5h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371338 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 03:57 Start Date: 14/Jan/20 03:57 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366142878 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -495,6 +525,162 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): + """A threadsafe wrapper which wraps a WatermarkEstimator with locking + mechanism to guarantee multi-thread safety. + """ + def __init__(self, watermark_estimator, lock): +if not isinstance(watermark_estimator, iobase.WatermarkEstimator): + raise ValueError('Initializing Threadsafe requires a WatermarkEstimator') +self._watermark_estimator = watermark_estimator +self._lock = lock + + def __getattr__(self, attr): +if hasattr(self._watermark_estimator, attr): + def method_wrapper(*args, **kw): +with self._lock: + return getattr(self._watermark_estimator, attr)(*args, **kw) + return method_wrapper +raise AttributeError(attr) + + def get_estimator_state(self): +# The caller should hold the lock before entering this function. +return self._watermark_estimator.get_estimator_state() + + def current_watermark(self): +# The caller should hold the lock before entering this function. +return self._watermark_estimator.current_watermark() + + def observe_timestamp(self, timestamp): +if not isinstance(timestamp, Timestamp): + raise ValueError('Input of observe_timestamp should be a Timestamp ' + 'object') +with self._lock: + self._watermark_estimator.observe_timestamp(timestamp) + + +class _ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestritionTracker`. + + This wrapper guarantees synchronization of modifying restrictions across + multi-thread. + """ + + def __init__(self, restriction_tracker, lock): +if not isinstance(restriction_tracker, iobase.RestrictionTracker): + raise ValueError( + 'Initialize ThreadsafeRestrictionTracker requires' + 'RestrictionTracker.') +self._restriction_tracker = restriction_tracker +# Records an absolute timestamp when defer_remainder is called. +self._deferred_timestamp = None +self._lock = lock +self._deferred_residual = None +self._deferred_watermark = None + + def current_restriction(self): +with self._lock: + return self._restriction_tracker.current_restriction() + + def try_claim(self, position): +with self._lock: + return self._restriction_tracker.try_claim(position) + + def defer_remainder(self, deferred_time=None): +"""Performs self-checkpoint on current processing restriction with an +expected resuming time. + +Self-checkpoint could happen during processing elements. When executing an +DoFn.process(), you may want to stop processing an element and resuming +later if current element has been processed quit a long time or you also +want to have some outputs from other elements. ``defer_remainder()`` can be +called on per element if needed. + +Args: + deferred_time: A relative ``Duration`` that indicates the ideal time gap + between now and resuming, or an absolute ``Timestamp`` for resuming + execution time. If the time_delay is None, the deferred work will be + executed as soon as possible. +""" + +# Record current time for calculating deferred_time later. +self._deferred_timestamp = Timestamp.now() +if (deferred_time and +not isinstance(deferred_time, Duration) and +not isinstance(deferred_time, Timestamp)): + raise ValueError('The timestamp of deter_remainder() should be a ' + 'Duration or a Timestamp, or None.') +self._deferred_watermark = deferred_time +checkpoint = self.try_split(0) +if checkpoint: + _, self._deferred_residual = checkpoint + + def check_done(self): +with self._lock: + return self._restriction_tracker.check_done() + + def current_progress(self): +with self._lock: + return self._restriction_tracker.current_progress() + + def try_split(self, fraction_of_remainder): +# The caller should hold the lock before entering this function. Review comment: Thanks! Changed it to `raise RuntimeError` This is an automated message from the Apache Git Service. To respond to the message, please log on to
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371329 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 03:38 Start Date: 14/Jan/20 03:38 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366139637 ## File path: sdks/python/apache_beam/io/watermark_estimators_test.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for built-in WatermarkEstiamtors""" + +from __future__ import absolute_import + +import unittest + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.io.watermark_estimators import ManualWatermarkEstimator +from apache_beam.io.watermark_estimators import MonotonicWatermarkEstimator +from apache_beam.io.watermark_estimators import WalltimeWatermarkEstimator +from apache_beam.utils.timestamp import Duration +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimatorTest(unittest.TestCase): + + def test_initialize_from_state(self): +timestamp = Timestamp(10) +watermark_estimator = MonotonicWatermarkEstimator(timestamp) +self.assertIsInstance(watermark_estimator, WatermarkEstimator) +self.assertEqual(watermark_estimator.get_estimator_state(), timestamp) + + def test_observe_timestamp(self): +watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10)) +watermark_estimator.observe_timestamp(Timestamp(15)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(15)) +watermark_estimator.observe_timestamp(Timestamp(20)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20)) +watermark_estimator.observe_timestamp(Timestamp(20)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20)) +with self.assertRaises(ValueError): + watermark_estimator.observe_timestamp(Timestamp(10)) + + def test_get_estimator_state(self): +watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10)) +watermark_estimator.observe_timestamp(Timestamp(15)) +self.assertEqual(watermark_estimator.get_estimator_state(), Timestamp(15)) + + +class WalltimeWatermarkEstimatorTest(unittest.TestCase): + + def test_initialization(self): +watermark_estiamtor = WalltimeWatermarkEstimator() +self.assertIsInstance(watermark_estiamtor, WatermarkEstimator) +self.assertIsNone(watermark_estiamtor.get_estimator_state()) + + def test_observe_timestamp(self): +watermark_estiamtor = WalltimeWatermarkEstimator() Review comment: Seems like we don't use clock in python sdk. Instead, mock should work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371329) Time Spent: 5.5h (was: 5h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371313 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 03:02 Start Date: 14/Jan/20 03:02 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366133403 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkExtimator which uses processing time as estimated watermark + simply. + """ + def __init__(self, timestamp=None): +self._timestamp = timestamp Review comment: You are right. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371313) Time Spent: 5h 20m (was: 5h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371301 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 14/Jan/20 02:37 Start Date: 14/Jan/20 02:37 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366128556 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') Review comment: The `MonotonicWatermarkEstimator.observe_timestamp()` expects the timestamp of output_record. For me, conceptually it's a timestamp. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371301) Time Spent: 5h 10m (was: 5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371225=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371225 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366087924 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkExtimator which uses processing time as estimated watermark + simply. Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371225) Time Spent: 4.5h (was: 4h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371222=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371222 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366087786 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') Review comment: ```suggestion 'watermark to be increasing monotonically.') ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371222) Time Spent: 4h 20m (was: 4h 10m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371223=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371223 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366087897 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkExtimator which uses processing time as estimated watermark Review comment: ```suggestion """A WatermarkEstimator which uses processing time as the estimated watermark. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371223) Time Spent: 4.5h (was: 4h 20m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371226=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371226 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r36603 ## File path: sdks/python/apache_beam/io/watermark_estimators_test.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for built-in WatermarkEstiamtors""" + +from __future__ import absolute_import + +import unittest + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.io.watermark_estimators import ManualWatermarkEstimator +from apache_beam.io.watermark_estimators import MonotonicWatermarkEstimator +from apache_beam.io.watermark_estimators import WalltimeWatermarkEstimator +from apache_beam.utils.timestamp import Duration +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimatorTest(unittest.TestCase): + + def test_initialize_from_state(self): +timestamp = Timestamp(10) +watermark_estimator = MonotonicWatermarkEstimator(timestamp) +self.assertIsInstance(watermark_estimator, WatermarkEstimator) +self.assertEqual(watermark_estimator.get_estimator_state(), timestamp) + + def test_observe_timestamp(self): +watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10)) +watermark_estimator.observe_timestamp(Timestamp(15)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(15)) +watermark_estimator.observe_timestamp(Timestamp(20)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20)) +watermark_estimator.observe_timestamp(Timestamp(20)) +self.assertEqual(watermark_estimator.current_watermark(), Timestamp(20)) +with self.assertRaises(ValueError): + watermark_estimator.observe_timestamp(Timestamp(10)) + + def test_get_estimator_state(self): +watermark_estimator = MonotonicWatermarkEstimator(Timestamp(10)) +watermark_estimator.observe_timestamp(Timestamp(15)) +self.assertEqual(watermark_estimator.get_estimator_state(), Timestamp(15)) + + +class WalltimeWatermarkEstimatorTest(unittest.TestCase): + + def test_initialization(self): +watermark_estiamtor = WalltimeWatermarkEstimator() +self.assertIsInstance(watermark_estiamtor, WatermarkEstimator) +self.assertIsNone(watermark_estiamtor.get_estimator_state()) + + def test_observe_timestamp(self): +watermark_estiamtor = WalltimeWatermarkEstimator() Review comment: You can test this explicitly if you can pass in a clock to the constructor, same for below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371226) Time Spent: 4h 40m (was: 4.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371231=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371231 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366089412 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -495,6 +525,162 @@ def invoke_process(self, windowed_value, self.process_method(windowed_value.value)) +class _ThreadsafeWatermarkEstimator(object): + """A threadsafe wrapper which wraps a WatermarkEstimator with locking + mechanism to guarantee multi-thread safety. + """ + def __init__(self, watermark_estimator, lock): +if not isinstance(watermark_estimator, iobase.WatermarkEstimator): + raise ValueError('Initializing Threadsafe requires a WatermarkEstimator') +self._watermark_estimator = watermark_estimator +self._lock = lock + + def __getattr__(self, attr): +if hasattr(self._watermark_estimator, attr): + def method_wrapper(*args, **kw): +with self._lock: + return getattr(self._watermark_estimator, attr)(*args, **kw) + return method_wrapper +raise AttributeError(attr) + + def get_estimator_state(self): +# The caller should hold the lock before entering this function. +return self._watermark_estimator.get_estimator_state() + + def current_watermark(self): +# The caller should hold the lock before entering this function. +return self._watermark_estimator.current_watermark() + + def observe_timestamp(self, timestamp): +if not isinstance(timestamp, Timestamp): + raise ValueError('Input of observe_timestamp should be a Timestamp ' + 'object') +with self._lock: + self._watermark_estimator.observe_timestamp(timestamp) + + +class _ThreadsafeRestrictionTracker(object): + """A thread-safe wrapper which wraps a `RestritionTracker`. Review comment: ```suggestion """A thread-safe wrapper which wraps a `RestrictionTracker`. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371231) Time Spent: 4h 50m (was: 4h 40m) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371228 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366088537 ## File path: sdks/python/apache_beam/io/watermark_estimators_test.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for built-in WatermarkEstiamtors""" Review comment: ```suggestion """Unit tests for built-in WatermarkEstimators""" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371228) Time Spent: 4h 40m (was: 4.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8537) Provide WatermarkEstimatorProvider for different types of WatermarkEstimator
[ https://issues.apache.org/jira/browse/BEAM-8537?focusedWorklogId=371227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371227 ] ASF GitHub Bot logged work on BEAM-8537: Author: ASF GitHub Bot Created on: 13/Jan/20 23:59 Start Date: 13/Jan/20 23:59 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10375: [BEAM-8537] Provide WatermarkEstimator to track watermark URL: https://github.com/apache/beam/pull/10375#discussion_r366088370 ## File path: sdks/python/apache_beam/io/watermark_estimators.py ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A collection of WatermarkEstimator implementations that SplittableDoFns +can use.""" + +from __future__ import absolute_import + +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.utils.timestamp import Timestamp + + +class MonotonicWatermarkEstimator(WatermarkEstimator): + """A WatermarkEstimator which assumes that timestamps of all ouput records + are increasing monotonically. + """ + def __init__(self, timestamp): +"""For a new tracking pair, the initial value +should be None. When resuming from residual, the initial timestamp should +be the same timestamp as the estimated watermark from primary. +""" +self._watermark = timestamp + + def observe_timestamp(self, timestamp): +if self._watermark is None: + self._watermark = timestamp +else: + if timestamp < self._watermark: +raise ValueError('A MonotonicWatermarkEstimator expects output ' + 'timestamp to be increasing monotonically.') + self._watermark = timestamp + + def current_watermark(self): +return self._watermark + + def get_estimator_state(self): +return self._watermark + + +class WalltimeWatermarkEstimator(WatermarkEstimator): + """A WatermarkExtimator which uses processing time as estimated watermark + simply. + """ + def __init__(self, timestamp=None): +self._timestamp = timestamp Review comment: Why not set the timestamp to current processing time if None then the current_watermark logic can remove the None condition case. This way the if check is only done once instead of once per current_watermark call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371227) Time Spent: 4h 40m (was: 4.5h) > Provide WatermarkEstimatorProvider for different types of WatermarkEstimator > > > Key: BEAM-8537 > URL: https://issues.apache.org/jira/browse/BEAM-8537 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > This is a follow up for in-progress PR: > https://github.com/apache/beam/pull/9794. > Current implementation in PR9794 provides a default implementation of > WatermarkEstimator. For further work, we want to let WatermarkEstimator to be > a pure Interface. We'll provide a WatermarkEstimatorProvider to be able to > create a custom WatermarkEstimator per windowed value. It should be similar > to how we track restriction for SDF: > WatermarkEstimator <---> RestrictionTracker > WatermarkEstimatorProvider <---> RestrictionTrackerProvider > WatermarkEstimatorParam <---> RestrictionDoFnParam -- This message was sent by Atlassian Jira (v8.3.4#803005)