[jira] [Resolved] (BEAM-6240) Allow users to annotate POJOs and JavaBeans for richer functionality
[ https://issues.apache.org/jira/browse/BEAM-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6240. -- Resolution: Fixed Fix Version/s: 2.12.0 > Allow users to annotate POJOs and JavaBeans for richer functionality > > > Key: BEAM-6240 > URL: https://issues.apache.org/jira/browse/BEAM-6240 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > Desired annotations: > * SchemaIgnore - ignore this field > * FieldName - allow the user to explicitly specify a field name > * SchemaCreate - register a function to be used to create an object (so > fields can be final, and no default constructor need be assumed). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7287) beam_PostCommit_Java11_ValidatesRunner_Direct broken
[ https://issues.apache.org/jira/browse/BEAM-7287?focusedWorklogId=243141&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243141 ] ASF GitHub Bot logged work on BEAM-7287: Author: ASF GitHub Bot Created on: 16/May/19 06:47 Start Date: 16/May/19 06:47 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #8584: [BEAM-7287] Change JAVA_HOME to match new Jenkins agents URL: https://github.com/apache/beam/pull/8584#issuecomment-492940374 @adude3141 Can you review it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 243141) Time Spent: 50m (was: 40m) > beam_PostCommit_Java11_ValidatesRunner_Direct broken > > > Key: BEAM-7287 > URL: https://issues.apache.org/jira/browse/BEAM-7287 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Michael Luckey >Assignee: Michal Walenia >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Since we switched to the new Jenkins agents, > beam_PostCommit_Java11_ValidatesRunner_Direct is consistently failing [1]. > https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct/buildTimeTrend > cc [~yifanzou] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner
[ https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=243122&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243122 ] ASF GitHub Bot logged work on BEAM-7305: Author: ASF GitHub Bot Created on: 16/May/19 05:32 Start Date: 16/May/19 05:32 Worklog Time Spent: 10m Work Description: jbartok commented on pull request #8410: [BEAM-7305] Add first version of Hazelcast Jet based Java Runner URL: https://github.com/apache/beam/pull/8410#discussion_r284543981 ## File path: runners/jet-experimental/build.gradle ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: Runners :: Hazelcast Jet" + +evaluationDependsOn(":sdks:java:core") +evaluationDependsOn(":runners:core-java") + +project.ext { +jet_version = '3.0' +hazelcast_version = '3.12' +} + +configurations { +validatesRunner +} + +dependencies { +shadow project(path: ":sdks:java:core", configuration: "shadow") +shadow project(path: ":runners:core-java", configuration: "shadow") +shadow "com.hazelcast.jet:hazelcast-jet:$jet_version" + +shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") +shadowTest project(path: ":runners:core-java", configuration: "shadowTest") +shadowTest library.java.hamcrest_core +shadowTest library.java.junit +shadowTest "com.hazelcast.jet:hazelcast-jet-core:$jet_version:tests" +shadowTest "com.hazelcast:hazelcast:$hazelcast_version:tests" +shadowTest "com.hazelcast:hazelcast-client:$hazelcast_version:tests" + +validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") +validatesRunner project(path: ":runners:core-java", configuration: "shadowTest") +validatesRunner project(path: project.path, configuration: "shadowTest") +} + +task validatesRunnerBatch(type: Test) { +group = "Verification" +systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ +"--runner=TestJetRunner", +"--jetGroupName=jet", +"--jetLocalParallelism=2" +]) + +classpath = configurations.validatesRunner +testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +useJUnit { +includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' +excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used +exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner +} + +maxHeapSize = '4g' +} + +task validatesRunner { +group = "Verification" +description "Validates Jet runner" +dependsOn validatesRunnerBatch +} + +spotless { +java { +paddedCell() Review comment: Hi Guys. I'm submitting a second pull request today with more changes related to jet runner and in that I'm removing this whole spotless block, it's not really needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 243122) Time Spent: 1h (was: 50m) > Add first version of Hazelcast Jet Runner > - > > Key: BEAM-7305 > URL: https://issues.apache.org/jira/browse/BEAM-7305 > Project: Beam > Issue Type: New Feature > Components: runner-jet >Reporter: Maximilian Michels >Assignee: Jozsef Bartok >Priority: Major > Fix For: 2.14.0 > > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7332) Blog post announcing Beam Kata
Henry Suryawirawan created BEAM-7332: Summary: Blog post announcing Beam Kata Key: BEAM-7332 URL: https://issues.apache.org/jira/browse/BEAM-7332 Project: Beam Issue Type: Task Components: website Reporter: Henry Suryawirawan Fix For: Not applicable Publishing a quick blog post that lets the users know about Beam Kata. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5510) Records including datetime to be saved as DATETIME or TIMESTAMP in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840930#comment-16840930 ] Steven Ensslen commented on BEAM-5510: -- This defect remains in 2.12.0, and affects `datetime.date` and `datetime.time` in addition to `datetime.datetime`. I'm working around it with this: {color:#569cd6}class{color} {color:#4ec9b0}ConvertDatesToStringsFn{color}{color:#d4d4d4}({color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}):{color} {color:#ce9178}"""{color} {color:#ce9178} Workaround for https://issues.apache.org/jira/browse/BEAM-5510{color} {color:#ce9178} """{color} {color:#569cd6}def{color} {color:#dcdcaa}process{color}{color:#d4d4d4}({color}{color:#9cdcfe}self{color}{color:#d4d4d4}, {color}{color:#9cdcfe}record{color}{color:#d4d4d4}):{color} {color:#c586c0}for{color}{color:#d4d4d4} key {color}{color:#569cd6}in{color}{color:#d4d4d4} record:{color} {color:#c586c0}if{color}{color:#d4d4d4} ({color}{color:#4ec9b0}type{color}{color:#d4d4d4}(record[key]) {color}{color:#d4d4d4}=={color}{color:#d4d4d4} datetime):{color} {color:#d4d4d4} record[key] {color}{color:#d4d4d4}={color}{color:#d4d4d4} record[key].strftime({color:#ce9178}'%Y-%m-{color}{color:#569cd6}%dT{color:#ce9178}%H:%M:%S{color}{color}{color:#ce9178}'{color}){color} {color:#c586c0}elif{color}{color:#d4d4d4} ({color}{color:#4ec9b0}type{color}{color:#d4d4d4}(record[key]) {color}{color:#d4d4d4}=={color}{color:#d4d4d4} date):({color} {color:#d4d4d4} record[key] {color}{color:#d4d4d4}={color}{color:#d4d4d4} record[key].strftime({color}{color:#ce9178}'%Y-%m-{color}{color:#569cd6}%d{color}{color:#ce9178}'{color}{color:#d4d4d4}){color} {color:#c586c0}elif{color}{color:#d4d4d4} ({color}{color:#4ec9b0}type{color}{color:#d4d4d4}(record[key]) {color}{color:#d4d4d4}=={color}{color:#d4d4d4} time):{color} {color:#d4d4d4} record[key] {color}{color:#d4d4d4}={color}{color:#d4d4d4} record[key].strftime({color}{color:#ce9178}'%H:%M:%S'{color}{color:#d4d4d4}){color} {color:#c586c0}yield{color}{color:#d4d4d4} record{color} > Records including datetime to be saved as DATETIME or TIMESTAMP in BigQuery > --- > > Key: BEAM-5510 > URL: https://issues.apache.org/jira/browse/BEAM-5510 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.6.0 >Reporter: Pascal Gula >Priority: Major > > When trying to write some row in BigQuery that include a python datetime > object, the marshaling used to save a row in BigQuery is impossible. > {code:java} > File > "/home/pascal/Wks/GitHub/PEAT-AI/Albatros/venv/local/lib/python2.7/site-packages/apache_beam/internal/gcp/json_value.py", > line 124, in to_json_value > raise TypeError('Cannot convert %s to a JSON value.' % repr(obj)) > TypeError: Cannot convert datetime.datetime(2018, 9, 25, 18, 57, 18, 108579) > to a JSON value. [while running 'save/WriteToBigQuery'] > {code} > However, this is something perfectly feasible, as `google-cloud-python` > supports it since this issue has been solved: > [https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2957] > thanks to this pull request: > [https://github.com/GoogleCloudPlatform/google-cloud-python/pull/3426/files] > As similar approach could be taken for the `json_value.py` helper. > Is there any workaround that can be applied to solve this issue? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=243039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243039 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 16/May/19 01:36 Start Date: 16/May/19 01:36 Worklog Time Spent: 10m Work Description: udim commented on issue #8590: [BEAM-6988] Implement a Python 3 version of getcallargs_forhints URL: https://github.com/apache/beam/pull/8590#issuecomment-492882969 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 243039) Time Spent: 4h (was: 3h 50m) > TypeHints Py3 Error: test_non_function > (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+ > - > > Key: BEAM-6988 > URL: https://issues.apache.org/jira/browse/BEAM-6988 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: niklas Hansson >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 53, in test_non_function > result = ['xa', 'bbx', 'xcx'] | beam.Map(str.strip, 'x') > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 510, in _ror_ > result = p.apply(self, pvalueish, label) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/pipeline.py", > line 514, in apply > transform.type_check_inputs(pvalueish) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 753, in type_check_inputs > hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/decorators.py", > line 283, in getcallargs_forhints > raise TypeCheckError(e) > apache_beam.typehints.decorators.TypeCheckError: strip() missing 1 required > positional argument: 'chars'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=243034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243034 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 16/May/19 01:23 Start Date: 16/May/19 01:23 Worklog Time Spent: 10m Work Description: udim commented on issue #8590: [BEAM-6988] Implement a Python 3 version of getcallargs_forhints URL: https://github.com/apache/beam/pull/8590#issuecomment-492880511 Here's my suggestion for fixing #8530. I tried to keep changes to a minimum and behavior similar. R or CC: @NikeNano @tvalentyn @fredo838 @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 243034) Time Spent: 3h 40m (was: 3.5h) > TypeHints Py3 Error: test_non_function > (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+ > - > > Key: BEAM-6988 > URL: https://issues.apache.org/jira/browse/BEAM-6988 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: niklas Hansson >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 53, in test_non_function > result = ['xa', 'bbx', 'xcx'] | beam.Map(str.strip, 'x') > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 510, in _ror_ > result = p.apply(self, pvalueish, label) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/pipeline.py", > line 514, in apply > transform.type_check_inputs(pvalueish) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 753, in type_check_inputs > hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/decorators.py", > line 283, in getcallargs_forhints > raise TypeCheckError(e) > apache_beam.typehints.decorators.TypeCheckError: strip() missing 1 required > positional argument: 'chars'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=243035&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243035 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 16/May/19 01:23 Start Date: 16/May/19 01:23 Worklog Time Spent: 10m Work Description: udim commented on issue #8590: [BEAM-6988] Implement a Python 3 version of getcallargs_forhints URL: https://github.com/apache/beam/pull/8590#issuecomment-492880663 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 243035) Time Spent: 3h 50m (was: 3h 40m) > TypeHints Py3 Error: test_non_function > (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+ > - > > Key: BEAM-6988 > URL: https://issues.apache.org/jira/browse/BEAM-6988 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: niklas Hansson >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 53, in test_non_function > result = ['xa', 'bbx', 'xcx'] | beam.Map(str.strip, 'x') > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 510, in _ror_ > result = p.apply(self, pvalueish, label) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/pipeline.py", > line 514, in apply > transform.type_check_inputs(pvalueish) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 753, in type_check_inputs > hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/decorators.py", > line 283, in getcallargs_forhints > raise TypeCheckError(e) > apache_beam.typehints.decorators.TypeCheckError: strip() missing 1 required > positional argument: 'chars'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=243032&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-243032 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 16/May/19 01:18 Start Date: 16/May/19 01:18 Worklog Time Spent: 10m Work Description: udim commented on pull request #8590: [BEAM-6988] Implement a Python 3 version of getcallargs_forhints URL: https://github.com/apache/beam/pull/8590 This new version uses inspect.signature instead of getfullargspec and getcallargs (partially for now - see TODO). The difference is that version 3.7 introduces function signatures for builtins, but getcallargs has a bug with treating optional positional arguments https://bugs.python.org/issue36920 and it's deprecated. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompl
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242993 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 16/May/19 00:02 Start Date: 16/May/19 00:02 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 Hi @ttanay , I spent some time to investigate into the issues. I am sharing my findings here and I hope it will be helpful. 1) I agree with what @aaltay said here. > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. In other words, the action item is that we should add type annotations here ```python @with_input_types(T) @with_output_types(T) class Globally(ptransform.PTransform): ... ``` and here ```python @with_input_types(KV[K, V]) @with_output_types(KV[K, V]) class PerKey(ptransform.PTransform): ... ``` This is the intended behavior for these transforms. I know that the current tests will break after we add these annotations, but I think the right thing to do is that we make these changes and fix the tests (how we create a PCollection of timestamped values/kvs using `Create`, to be specific). 2) So how can we create a PCollection of kvs that have timestamps for testing? A very simple fix is that we can add a `Map(lambda x: x)` transform after `Create`. With this change you can also use `TimestampedValue` instead of `WindowedValue` here. I have tried it and the following test passed: ```python def test_per_key(self): l = [window.TimestampedValue(('a', 1), 300), window.TimestampedValue(('b', 3), 100), window.TimestampedValue(('a', 2), 200)] with TestPipeline() as p: pc = p | Create(l) | Map(lambda x: x) latest = pc | combine.Latest.PerKey() assert_that(latest, equal_to([('a', 1), ('b', 3)])) ``` 3) The reason why this hack will make the type checking works is related to some underlying implementation details. To put it simply, both `WindowedValue` and `TimestampValue` are special objects that can be properly handled by a `DoFn`, but `Create` won't treat them differently than other object types. By inserting a `Map` (based on `DoFn` under the hood) after `Create`, we force the data to go through a `DoFn`, so the output will be a PCollection of only the `element` type, instead of the `TimestampedValue(element, timestamp)` type. Hope this explanation makes sense to you. Let me know if you have questions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242993) Time Spent: 9h 20m (was: 9h 10m) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 9h 20m > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242985&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242985 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 23:46 Start Date: 15/May/19 23:46 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492864080 I meant that we should have some pointers in Beam documentation that can point to existence of these benchmarks and dashboards they produce. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242985) Time Spent: 14h 20m (was: 14h 10m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 14h 20m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242975 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:37 Start Date: 15/May/19 23:37 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 Hi @ttanay , I spent some time to investigate into the issues. I am sharing my findings here and I hope it will be helpful. 1) I agree with what @aaltay said here. > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. In other words, the action item is that we should add type annotations here ```python @with_input_types(T) @with_output_types(T) class Globally(ptransform.PTransform): ... ``` and here ```python @with_input_types(KV[K, V]) @with_output_types(KV[K, V]) class PerKey(ptransform.PTransform): ... ``` This is the intended behavior for these transforms. I know that the current tests will break after we add these annotations, but I think the right thing to do is that we make these changes and fix the tests (how we create a PCollection of timestamped values/kvs for testing using `Create`, to be specific). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242975) Time Spent: 9h 10m (was: 9h) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 9h 10m > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242967 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:29 Start Date: 15/May/19 23:29 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. Hi @ttanay , I agree with what @aaltay said here. In other words, we should add type annotations here ```python @with_input_types(T) @with_output_types(T) class Globally(ptransform.PTransform): ``` and here ```python @with_input_types(KV[K, V]) @with_output_types(KV[K, V]) class PerKey(ptransform.PTransform): ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242967) Time Spent: 9h (was: 8h 50m) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 9h > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242965&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242965 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:28 Start Date: 15/May/19 23:28 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. Hi @ttanay , I agree with what @aaltay said here. In other words, we should add type annotations here ```python @with_input_types(T) @with_output_types(T) class Globally(ptransform.PTransform): ``` and here ``` @with_input_types(KV[K, V]) @with_output_types(KV[K, V]) class PerKey(ptransform.PTransform): ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242965) Time Spent: 8h 40m (was: 8.5h) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 8h 40m > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242966&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242966 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:28 Start Date: 15/May/19 23:28 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. Hi @ttanay , I agree with what @aaltay said here. In other words, we should add type annotations here ```python @with_input_types(T) @with_output_types(T) class Globally(ptransform.PTransform): ``` and here ```python @with_input_types(KV[K, V]) @with_output_types(KV[K, V]) class PerKey(ptransform.PTransform): ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242966) Time Spent: 8h 50m (was: 8h 40m) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 8h 50m > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242964 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:28 Start Date: 15/May/19 23:28 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. Hi @ttanay , I agree with what @aaltay said here. In other words, we should add type annotations here ```python @with_input_types(T) \ @with_output_types(T) class Globally(ptransform.PTransform): ``` and here ``` @with_input_types(KV[K, V]) \ @with_output_types(KV[K, V]) \ class PerKey(ptransform.PTransform): ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242964) Time Spent: 8.5h (was: 8h 20m) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 8.5h > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6695) Latest transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242962 ] ASF GitHub Bot logged work on BEAM-6695: Author: ASF GitHub Bot Created on: 15/May/19 23:27 Start Date: 15/May/19 23:27 Worklog Time Spent: 10m Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest PTransform for Python SDK URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487 > In your example, the inputs you are passing are not in type Tuple[K, V]. For example, whatever test you come up with, it should be valid to do Create(elem_list) | GroupByKey(), and that input will work on PerKey transform. Hi @ttanay , I agree with what @aaltay said here. In other words, we should add type annotations here `@with_input_types(T) \ @with_output_types(T) class Globally(ptransform.PTransform):` and here `@with_input_types(KV[K, V]) \ @with_output_types(KV[K, V]) \ class PerKey(ptransform.PTransform): ...` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242962) Time Spent: 8h 20m (was: 8h 10m) > Latest transform for Python SDK > --- > > Key: BEAM-6695 > URL: https://issues.apache.org/jira/browse/BEAM-6695 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Tanay Tummalapalli >Priority: Minor > Time Spent: 8h 20m > Remaining Estimate: 0h > > Add a PTransform} and Combine.CombineFn for computing the latest element in a > PCollection. > It should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242951 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 23:11 Start Date: 15/May/19 23:11 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492856951 Thank you @tvalentyn. To your question, results are stored in Bigquery table which should be accessible from UI or gcloud commandline. Different job is likely to use own table and configured in `testConfigurations` so it's not possible to provide a static link in groovy file. I'll write up a document for Python performance test and I can include this info. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242951) Time Spent: 14h 10m (was: 14h) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 14h 10m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242948 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 23:10 Start Date: 15/May/19 23:10 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492856951 Thank you @tvalentyn. To your question, results are stored in Bigquery table which should be accessible from UI or gcloud commandline. Different job is likely to use own table and configured in `testConfigurations` so it's not possible to provide a static link in groovy file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242948) Time Spent: 14h (was: 13h 50m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 14h > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-7331) Missing util function for late pane in java PAssert
[ https://issues.apache.org/jira/browse/BEAM-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik closed BEAM-7331. --- Resolution: Fixed Fix Version/s: 2.14.0 > Missing util function for late pane in java PAssert > > > Key: BEAM-7331 > URL: https://issues.apache.org/jira/browse/BEAM-7331 > Project: Beam > Issue Type: Improvement > Components: testing >Reporter: Ruoyun Huang >Assignee: Ruoyun Huang >Priority: Minor > Fix For: 2.14.0 > > Time Spent: 20m > Remaining Estimate: 0h > > coming from a user's question: > [https://stackoverflow.com/questions/56132551/apache-beam-teststream-finalpane-not-firing-as-expected] > There are util functions for all types of Panes, except for LatePane. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7331) Missing util function for late pane in java PAssert
[ https://issues.apache.org/jira/browse/BEAM-7331?focusedWorklogId=242941&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242941 ] ASF GitHub Bot logged work on BEAM-7331: Author: ASF GitHub Bot Created on: 15/May/19 23:03 Start Date: 15/May/19 23:03 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #8587: [BEAM-7331] Create PAssert util function for late panes. URL: https://github.com/apache/beam/pull/8587 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242941) Time Spent: 20m (was: 10m) > Missing util function for late pane in java PAssert > > > Key: BEAM-7331 > URL: https://issues.apache.org/jira/browse/BEAM-7331 > Project: Beam > Issue Type: Improvement > Components: testing >Reporter: Ruoyun Huang >Assignee: Ruoyun Huang >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > coming from a user's question: > [https://stackoverflow.com/questions/56132551/apache-beam-teststream-finalpane-not-firing-as-expected] > There are util functions for all types of Panes, except for LatePane. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7331) Missing util function for late pane in java PAssert
[ https://issues.apache.org/jira/browse/BEAM-7331?focusedWorklogId=242927&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242927 ] ASF GitHub Bot logged work on BEAM-7331: Author: ASF GitHub Bot Created on: 15/May/19 23:01 Start Date: 15/May/19 23:01 Worklog Time Spent: 10m Work Description: HuangLED commented on issue #8587: [BEAM-7331]Create PAssert util function for late panes. URL: https://github.com/apache/beam/pull/8587#issuecomment-492855158 R: @lukecwik updated with new unit test case(s) in PaneExtractorTest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242927) Time Spent: 10m Remaining Estimate: 0h > Missing util function for late pane in java PAssert > > > Key: BEAM-7331 > URL: https://issues.apache.org/jira/browse/BEAM-7331 > Project: Beam > Issue Type: Improvement > Components: testing >Reporter: Ruoyun Huang >Assignee: Ruoyun Huang >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > coming from a user's question: > [https://stackoverflow.com/questions/56132551/apache-beam-teststream-finalpane-not-firing-as-expected] > There are util functions for all types of Panes, except for LatePane. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7283) Have javadoc offline link dependency versions bound to versions within BeamModulePlugin.groovy
[ https://issues.apache.org/jira/browse/BEAM-7283?focusedWorklogId=242926&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242926 ] ASF GitHub Bot logged work on BEAM-7283: Author: ASF GitHub Bot Created on: 15/May/19 22:56 Start Date: 15/May/19 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #8588: [BEAM-7283] Update aggregated javadoc versions during linking to point to javadoc.io URL: https://github.com/apache/beam/pull/8588#issuecomment-492854273 R: @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242926) Time Spent: 20m (was: 10m) > Have javadoc offline link dependency versions bound to versions within > BeamModulePlugin.groovy > -- > > Key: BEAM-7283 > URL: https://issues.apache.org/jira/browse/BEAM-7283 > Project: Beam > Issue Type: Improvement > Components: website >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Make Javadocs offline link dependency versions pull versions by using code > instead of hardcoded versions. > Code location to update: > https://github.com/apache/beam/blob/abece47cc1c1c88a519e54e67a2d358b439cf69c/sdks/java/javadoc/build.gradle#L78 > Can be updated to use the following: > [dependencies.create(project.library.java.google_api_client).getVersion()|https://github.com/apache/beam/blob/abece47cc1c1c88a519e54e67a2d358b439cf69c/sdks/java/maven-archetypes/examples/build.gradle#L29] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7283) Have javadoc offline link dependency versions bound to versions within BeamModulePlugin.groovy
[ https://issues.apache.org/jira/browse/BEAM-7283?focusedWorklogId=242925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242925 ] ASF GitHub Bot logged work on BEAM-7283: Author: ASF GitHub Bot Created on: 15/May/19 22:56 Start Date: 15/May/19 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #8588: [BEAM-7283] Update aggregated javadoc versions during linking to point to javadoc.io URL: https://github.com/apache/beam/pull/8588 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/) | --- | --- | --- Pre-Commit Tests Status (on master branch) --- |Java | Python | Go | Website --- | --- | --- | --- | --- Non-portable | [![Build Status](https://builds.apach
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242918&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242918 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:48 Start Date: 15/May/19 22:48 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284481464 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,205 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This module has all statistic related transforms.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import math +import sys +from builtins import round + +import mmh3 + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, size=None, error=None): + +if None not in (size, error): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (size, error)) +elif size is None and error is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif size is not None: + if not isinstance(size, int) or size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (size)) + else: +self._sample_size = size +self._max_est_err = None +else: + if error < 0.01 or error > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (error)) + else: +self._sample_size = self._get_sample_size_from_est_error(error) +self._max_est_err = error + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +# math.ceil in python 2.7 returns float, while it returns int in python 3. +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +self._min_hash = sys.maxsize +self._sample_heap = [] +self._sample_set = set() + + def add(self, element): +""" +:param an element from pcoll. +:return: boolean type whether the v
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242916&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242916 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:48 Start Date: 15/May/19 22:48 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284481299 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import logging +import math +import sys +from builtins import round + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +try: + import mmh3 +except ImportError: + logging.info('Python version >=3.0 uses buildin hash function.') + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, **kwargs): +input_size = kwargs.pop('size', None) +input_err = kwargs.pop('error', None) + +if None not in (input_size, input_err): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (input_size, input_err)) +elif input_size is None and input_err is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif input_size is not None: + if not isinstance(input_size, int) or input_size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (input_size)) + else: +self._sample_size = input_size +self._max_est_err = None +else: + if input_err < 0.01 or input_err > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (input_err)) + else: +self._sample_size = self._get_sample_size_from_est_error(input_err) +self._max_est_err = input_err + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242917 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:48 Start Date: 15/May/19 22:48 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284481331 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,205 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This module has all statistic related transforms.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import math +import sys +from builtins import round + +import mmh3 + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, size=None, error=None): + +if None not in (size, error): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (size, error)) +elif size is None and error is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif size is not None: + if not isinstance(size, int) or size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (size)) + else: +self._sample_size = size +self._max_est_err = None +else: + if error < 0.01 or error > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (error)) + else: +self._sample_size = self._get_sample_size_from_est_error(error) +self._max_est_err = error + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +# math.ceil in python 2.7 returns float, while it returns int in python 3. +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +self._min_hash = sys.maxsize +self._sample_heap = [] +self._sample_set = set() + + def add(self, element): +""" +:param an element from pcoll. +:return: boolean type whether the v
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242915&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242915 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:47 Start Date: 15/May/19 22:47 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284481067 ## File path: sdks/python/apache_beam/transforms/stats_test.py ## @@ -0,0 +1,384 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import division + +import math +import random +import unittest +from collections import defaultdict + +import numpy as np + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +class ApproximateUniqueTest(unittest.TestCase): + """Unit tests for ApproximateUniqueGlobally and ApproximateUniquePerKey.""" + + def test_approximate_unique_global_by_invalid_size(self): +# test if the transformation throws an error as expected with an invalid +# small input size (< 16). +sample_size = 10 +test_input = [random.randint(0, 1000) for _ in range(100)] + +with self.assertRaises(ValueError) as e: + pipeline = TestPipeline() + _ = (pipeline + | 'create' + >> beam.Create(test_input) + | 'get_estimate' + >> beam.ApproximateUniqueGlobally(size=sample_size)) + pipeline.run() + +expected_msg = beam.ApproximateUniqueGlobally._INPUT_SIZE_ERR_MSG % ( +sample_size) + +assert e.exception.args[0] == expected_msg + + def test_approximate_unique_global_by_invalid_type_size(self): +# test if the transformation throws an error as expected with an invalid +# type of input size (not int). +sample_size = 100.0 +test_input = [random.randint(0, 1000) for _ in range(100)] + +with self.assertRaises(ValueError) as e: + pipeline = TestPipeline() + _ = (pipeline + | 'create' >> beam.Create(test_input) + | 'get_estimate' + >> beam.ApproximateUniqueGlobally(size=sample_size)) + pipeline.run() + +expected_msg = beam.ApproximateUniqueGlobally._INPUT_SIZE_ERR_MSG % ( +sample_size) + +assert e.exception.args[0] == expected_msg + + def test_approximate_unique_global_by_invalid_small_error(self): +# test if the transformation throws an error as expected with an invalid +# small input error (< 0.01). +est_err = 0.0 +test_input = [random.randint(0, 1000) for _ in range(100)] + +with self.assertRaises(ValueError) as e: + pipeline = TestPipeline() + _ = (pipeline + | 'create' >> beam.Create(test_input) + | 'get_estimate' + >> beam.ApproximateUniqueGlobally(error=est_err)) + pipeline.run() + +expected_msg = beam.ApproximateUniqueGlobally._INPUT_ERROR_ERR_MSG % ( +est_err) + +assert e.exception.args[0] == expected_msg + + def test_approximate_unique_global_by_invalid_big_error(self): +# test if the transformation throws an error as expected with an invalid +# big input error (> 0.50). +est_err = 0.6 +test_input = [random.randint(0, 1000) for _ in range(100)] + +with self.assertRaises(ValueError) as e: + pipeline = TestPipeline() + _ = (pipeline + | 'create' >> beam.Create(test_input) + | 'get_estimate' + >> beam.ApproximateUniqueGlobally(error=est_err)) + pipeline.run() + +expected_msg = beam.ApproximateUniqueGlobally._INPUT_ERROR_ERR_MSG % ( +est_err) + +assert e.exception.args[0] == expected_msg + + def test_approximate_unique_global_by_invalid_no_input(self): +# test if the transformation throws an error as expected with no input. +test_input = [random.randint(0, 1000) for _ in range(100)] + +with self.assertRaises(Valu
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242906&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242906 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:37 Start Date: 15/May/19 22:37 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284472831 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: Thanks for asking this question. I was able to dig into more and found an unexpected reason. From 3.4, Python used SpiHash algorithm, before that, it used FNV algorithm. Though SpiHash reduced collisions, I don't think it's noticeable from my test. So I saw how hash values are distributed. Following graphs show how hash values are distributed with different hash algorithm given exact the same data set. Builtin hash function with Python 2 is not as uniformly distributed as other hash functions (builtin hash function with Python 3 and mmh3). Though it's not obvious from the graphs that hash values from mmh3 algorithms are more uniformed distributed than the ones from builtin hash function with python 3, test results shows that mmh3 has more uniformly distributed hash values. ![mmh3](https://user-images.githubusercontent.com/16039146/57813458-04d3de80-7725-11e9-8ffd-f75c2ba5f057.png) ![py2](https://user-images.githubusercontent.com/16039146/57813459-04d3de80-7725-11e9-8da7-6f27f4f06f75.png) ![py3](https://user-images.githubusercontent.com/16039146/57813460-056c7500-7725-11e9-8d2a-640b47e0f823.png) Why does it matter to approximate unique count? We are calculating sample space with hash values and calculate population space from the sample space. More uniformly distributed hash values give us more accurate sample space thus we can calculate more accurate population space. The difference is more obvious when we have more duplicate records with input. A test case is generating 1 elements within range of [0, 1000]. This test consistently fails if we use builtin hash function with py2.7, and fails sometimes with builtin hash function with py3 and mmh3 rarely (< 1%) failed. (definition of failure: estimation error > 2 / sqrt(sample_size)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242906) Time Spent: 6h 40m (was: 6.5h) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 6h 40m > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7322) PubSubIO watermark does not advance for very low volumes
[ https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840834#comment-16840834 ] Tim Sell commented on BEAM-7322: Further explanation of the data.json I uploaded, I set it my job to have a 1 minute fixed windows. It consumes, windows and counts different pubsub subscriptions separately. They had publishing rates of 10, 5, 1 and 0.5 second frequencies. After about 10 minutes I had on_time and late panes for the 5,1 and 0.5 second streams, but had none for the 10 second frequency stream. > PubSubIO watermark does not advance for very low volumes > > > Key: BEAM-7322 > URL: https://issues.apache.org/jira/browse/BEAM-7322 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Tim Sell >Priority: Minor > Attachments: data.json > > > I have identified an issue where the watermark does not advance when using > the beam PubSubIO when volumes are very low. > I have created a mini example project to demonstrate the behaviour with a > python script for generating messages at different frequencies: > https://github.com/tims/beam/tree/master/pubsub-watermark > [note: this is in a directory of a Beam fork for corp hoop jumping > convenience on my end, it is not intended for merging]. > The behaviour is easily replicated if you apply a fixed window triggering > after the watermark passes the end of the window. > {code} > pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) > .apply(ParDo.of(new ParseScoreEventFn())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > .apply(MapElements.into(kvs(strings(), integers())) > .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), > scoreEvent.getScore( > .apply(Count.perKey()) > .apply(ParDo.of(Log.of("counted per key"))); > {code} > With this triggering, using both the flink local runner the direct runner, > panes will be fired after a long delay (minutes) for low frequencies of > messages in pubsub (seconds). The biggest issue is that it seems no panes > will ever be emitted if you just send a few events and stop. This is > particularly likely trip up people new to Beam. > If I change the triggering to have early firings I get exactly the emitted > panes that you would expect. > {code} > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() > .alignedTo(Duration.standardSeconds(60 > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > {code} > I can use any variation of early firing triggers and they work as expected. > We believe that the watermark is not advancing when the volume is too low > because of the sampling that PubSubIO does to determine it's watermark. It > just never has a large enough sample. > This problem occurs in the direct runner and flink runner, but not in the > dataflow runner (because dataflow uses it's own PubSubIO because dataflow has > access to internal details of pubsub and so doesn't need to do any sampling). > For extra context from the user@ list: > *Kenneth Knowles:* > Thanks to your info, I think it is the configuration of MovingFunction [1] > that is the likely culprit, but I don't totally understand why. It is > configured like so: > - store 60 seconds of data > - update data every 5 seconds > - require at least 10 messages to be 'significant' > - require messages from at least 2 distinct 5 second update periods to > 'significant' > I would expect a rate of 1 message per second to satisfy this. I may have > read something wrong. > Have you filed an issue in Jira [2]? > Kenn > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 > [2] https://issues.apache.org/jira/projects/BEAM/issues > *Alexey Romanenko:* > Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242902&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242902 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:21 Start Date: 15/May/19 22:21 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284474021 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: ![mmh3](https://user-images.githubusercontent.com/16039146/57813427-e66de300-7724-11e9-90d0-432b465aedad.png) ![py2](https://user-images.githubusercontent.com/16039146/57813429-e66de300-7724-11e9-8d6a-c7577dcb540a.png) ![py3](https://user-images.githubusercontent.com/16039146/57813430-e66de300-7724-11e9-80d1-7220cf60af8a.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242902) Time Spent: 6.5h (was: 6h 20m) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 6.5h > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242899&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242899 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:20 Start Date: 15/May/19 22:20 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284474021 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: ![mmh3](https://user-images.githubusercontent.com/16039146/57813427-e66de300-7724-11e9-90d0-432b465aedad.png) ![py2](https://user-images.githubusercontent.com/16039146/57813429-e66de300-7724-11e9-8d6a-c7577dcb540a.png) ![py3](https://user-images.githubusercontent.com/16039146/57813430-e66de300-7724-11e9-80d1-7220cf60af8a.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242899) Time Spent: 6h 10m (was: 6h) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 6h 10m > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242901&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242901 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:20 Start Date: 15/May/19 22:20 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284472831 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: Thanks for asking this question. I was able to dig into more and found an unexpected reason. From 3.4, Python used SpiHash algorithm, before that, it used FNV algorithm. Though SpiHash reduced collisions, I don't think it's noticeable from my test. So I saw how hash values are distributed. Following graphs show how hash values are distributed with different hash algorithm given exact the same data set. Builtin hash function with Python 2 is not as uniformly distributed as other hash functions (builtin hash function with Python 3 and mmh3). Though it's not obvious from the graphs that hash values from mmh3 algorithms are more uniformed distributed than the ones from builtin hash function with python 3, test results shows that mmh3 has more uniformly distributed hash values. ![mmh3](https://user-images.githubusercontent.com/16039146/57813458-04d3de80-7725-11e9-8ffd-f75c2ba5f057.png) ![py2](https://user-images.githubusercontent.com/16039146/57813459-04d3de80-7725-11e9-8da7-6f27f4f06f75.png) ![py3](https://user-images.githubusercontent.com/16039146/57813460-056c7500-7725-11e9-8d2a-640b47e0f823.png) Why does it matter to approximate unique count? We are calculating sample space with hash values and calculate population space from the sample space. More uniformly distributed hash values give us more accurate sample space thus we can calculate more accurate population space. The difference is more obvious when we have more duplicate records with input. A test case is generating 1 elements within range of [0, 1000]. This test consistently fails if we use builtin hash function with py2.7, and fails sometimes with builtin hash function with py3 and hasn't fail with mmh3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242901) Time Spent: 6h 20m (was: 6h 10m) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 6h 20m > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242898&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242898 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:16 Start Date: 15/May/19 22:16 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284472831 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: Thanks for asking this question. I was able to dig into more and found an unexpected reason. From 3.4, Python used SpiHash algorithm, before that, it used FNV algorithm. Though SpiHash reduced collisions, I don't think it's noticeable from my test. So I saw how hash values are distributed. Graphs at https://docs.google.com/spreadsheets/d/1QpNAorUhUY1Nq3b4QRMfbB9Rupq3ZC9FshunIyRRfH0/edit#gid=0 shows how hash values are distributed with different hash algorithm given exact the same data set. Builtin hash function with Python 2 is not as uniformly distributed as other hash functions (builtin hash function with Python 3 and mmh3). Though it's not obvious from the graphs that hash values from mmh3 algorithms are more uniformed distributed than the ones from builtin hash function with python 3, test results shows that mmh3 has more uniformly distributed hash values. Why does it matter to approximate unique count? We are calculating sample space with hash values and calculate population space from the sample space. More uniformly distributed hash values give us more accurate sample space thus we can calculate more accurate population space. The difference is more obvious when we have more duplicate records with input. A test case is generating 1 elements within range of [0, 1000]. This test consistently fails if we use builtin hash function with py2.7, and fails sometimes with builtin hash function with py3 and hasn't fail with mmh3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242898) Time Spent: 6h (was: 5h 50m) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 6h > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242897 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 22:15 Start Date: 15/May/19 22:15 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284472831 ## File path: sdks/python/setup.py ## @@ -125,6 +125,7 @@ def get_version(): 'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"', 'pyyaml>=3.12,<4.0.0', 'typing>=3.6.0,<3.7.0; python_version < "3.5.0"', +'mmh3>=2.5.1; python_version >= "2.7"', Review comment: Thanks for asking this question. I was able to dig into more and found an unexpected reason. From 3.4, Python used SpiHash algorithm, before that, it used FNV algorithm. Though SpiHash reduced collision, I don't think it's noticeable from my test. So I saw how hash values are distributed. Graphs at https://docs.google.com/spreadsheets/d/1QpNAorUhUY1Nq3b4QRMfbB9Rupq3ZC9FshunIyRRfH0/edit#gid=0 shows how hash values are distributed with different hash algorithm given exact the same data set. Builtin hash function with Python 2 is not as uniformly distributed as other hash functions (builtin hash function with Python 3 and mmh3). Though it's not obvious from the graphs that hash values from mmh3 algorithms are more uniformed distributed than the ones from builtin hash function with python 3, test results shows that mmh3 has more uniformly distributed hash values. Why does it matter to approximate unique count? We are calculating sample space with hash values and calculate population space from the sample space. More uniformly distributed hash values give us more accurate sample space thus we can calculate more accurate population space. The difference is more obvious when we have more duplicate records with input. A test case is generating 1 elements within range of [0, 1000]. This test consistently fails if we use builtin hash function with py2.7, and fails sometimes with builtin hash function with py3 and hasn't fail with mmh3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242897) Time Spent: 5h 50m (was: 5h 40m) > ApproximateUnique transform for Python SDK > -- > > Key: BEAM-6693 > URL: https://issues.apache.org/jira/browse/BEAM-6693 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Ahmet Altay >Assignee: Hannah Jiang >Priority: Minor > Time Spent: 5h 50m > Remaining Estimate: 0h > > Add a PTransform for estimating the number of distinct elements in a > PCollection and the number of distinct values associated with each key in a > PCollection KVs. > it should offer the same API as its Java counterpart: > https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6683) Add an integration test suite for cross-language transforms for Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6683?focusedWorklogId=242894&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242894 ] ASF GitHub Bot logged work on BEAM-6683: Author: ASF GitHub Bot Created on: 15/May/19 22:11 Start Date: 15/May/19 22:11 Worklog Time Spent: 10m Work Description: ihji commented on issue #8174: [BEAM-6683] add createCrossLanguageValidatesRunner task URL: https://github.com/apache/beam/pull/8174#issuecomment-492479614 Run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242894) Time Spent: 6h 20m (was: 6h 10m) > Add an integration test suite for cross-language transforms for Flink runner > > > Key: BEAM-6683 > URL: https://issues.apache.org/jira/browse/BEAM-6683 > Project: Beam > Issue Type: Test > Components: testing >Reporter: Chamikara Jayalath >Assignee: Heejong Lee >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > We should add an integration test suite that covers following. > (1) Currently available Java IO connectors that do not use UDFs work for > Python SDK on Flink runner. > (2) Currently available Python IO connectors that do not use UDFs work for > Java SDK on Flink runner. > (3) Currently available Java/Python pipelines work in a scalable manner for > cross-language pipelines (for example, try 10GB, 100GB input for > textio/avroio for Java and Python). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6683) Add an integration test suite for cross-language transforms for Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6683?focusedWorklogId=242893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242893 ] ASF GitHub Bot logged work on BEAM-6683: Author: ASF GitHub Bot Created on: 15/May/19 22:11 Start Date: 15/May/19 22:11 Worklog Time Spent: 10m Work Description: ihji commented on issue #8174: [BEAM-6683] add createCrossLanguageValidatesRunner task URL: https://github.com/apache/beam/pull/8174#issuecomment-492539808 Run python precommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242893) Time Spent: 6h 10m (was: 6h) > Add an integration test suite for cross-language transforms for Flink runner > > > Key: BEAM-6683 > URL: https://issues.apache.org/jira/browse/BEAM-6683 > Project: Beam > Issue Type: Test > Components: testing >Reporter: Chamikara Jayalath >Assignee: Heejong Lee >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > We should add an integration test suite that covers following. > (1) Currently available Java IO connectors that do not use UDFs work for > Python SDK on Flink runner. > (2) Currently available Python IO connectors that do not use UDFs work for > Java SDK on Flink runner. > (3) Currently available Java/Python pipelines work in a scalable manner for > cross-language pipelines (for example, try 10GB, 100GB input for > textio/avroio for Java and Python). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7331) Missing util function for late pane in java PAssert
Ruoyun Huang created BEAM-7331: -- Summary: Missing util function for late pane in java PAssert Key: BEAM-7331 URL: https://issues.apache.org/jira/browse/BEAM-7331 Project: Beam Issue Type: Improvement Components: testing Reporter: Ruoyun Huang Assignee: Ruoyun Huang coming from a user's question: [https://stackoverflow.com/questions/56132551/apache-beam-teststream-finalpane-not-firing-as-expected] There are util functions for all types of Panes, except for LatePane. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242887&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242887 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 21:51 Start Date: 15/May/19 21:51 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492837116 CC: manisha252@ who is working on performance test infrastructure and may have additional input here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242887) Time Spent: 13h 40m (was: 13.5h) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13h 40m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242888 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 21:51 Start Date: 15/May/19 21:51 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492837116 CC: @manisha252 who is working on performance test infrastructure and may have additional input here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242888) Time Spent: 13h 50m (was: 13h 40m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13h 50m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242873&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242873 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 21:31 Start Date: 15/May/19 21:31 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#discussion_r284459827 ## File path: .test-infra/jenkins/job_PerformanceTests_Python.groovy ## @@ -110,12 +106,12 @@ private void createPythonPerformanceTestJob(PerformanceTestConfigurations testCo testConfig.jobTriggerPhrase) def argMap = [ -beam_sdk: testConfig.sdk, +beam_sdk: 'python', benchmarks : testConfig.benchmarkName, bigquery_table : testConfig.resultTable, beam_it_class : testConfig.itClass, beam_it_module : testConfig.itModule, -beam_prebuilt : testConfig.prebuilt.toString(), +beam_prebuilt : 'true', Review comment: The comment says `// always true for Python tests` but does not explain why this needs to be true, so this configuration bit remains a little cryptic... Could we add a simple explanation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242873) Time Spent: 13.5h (was: 13h 20m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13.5h > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=242866&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242866 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 15/May/19 21:24 Start Date: 15/May/19 21:24 Worklog Time Spent: 10m Work Description: udim commented on issue #8530: [BEAM-6988] solved problem related to updates of the str object URL: https://github.com/apache/beam/pull/8530#issuecomment-492829278 BTW, I'm still trying to come up with a fix for 3.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242866) Time Spent: 3h 20m (was: 3h 10m) > TypeHints Py3 Error: test_non_function > (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+ > - > > Key: BEAM-6988 > URL: https://issues.apache.org/jira/browse/BEAM-6988 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: niklas Hansson >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 53, in test_non_function > result = ['xa', 'bbx', 'xcx'] | beam.Map(str.strip, 'x') > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 510, in _ror_ > result = p.apply(self, pvalueish, label) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/pipeline.py", > line 514, in apply > transform.type_check_inputs(pvalueish) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 753, in type_check_inputs > hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/decorators.py", > line 283, in getcallargs_forhints > raise TypeCheckError(e) > apache_beam.typehints.decorators.TypeCheckError: strip() missing 1 required > positional argument: 'chars'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7330) Log4j conflict causes stuck job in Datafloiw
Anton Kedin created BEAM-7330: - Summary: Log4j conflict causes stuck job in Datafloiw Key: BEAM-7330 URL: https://issues.apache.org/jira/browse/BEAM-7330 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Anton Kedin [https://stackoverflow.com/questions/56143496/beam-pipeline-not-moving-in-google-dataflow-while-running-ok-on-direct-runner?noredirect=1#comment98943808_56143496 ] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242849 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 20:50 Start Date: 15/May/19 20:50 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284445270 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import logging +import math +import sys +from builtins import round + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +try: + import mmh3 +except ImportError: + logging.info('Python version >=3.0 uses buildin hash function.') + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, **kwargs): +input_size = kwargs.pop('size', None) +input_err = kwargs.pop('error', None) + +if None not in (input_size, input_err): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (input_size, input_err)) +elif input_size is None and input_err is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif input_size is not None: + if not isinstance(input_size, int) or input_size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (input_size)) + else: +self._sample_size = input_size +self._max_est_err = None +else: + if input_err < 0.01 or input_err > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (input_err)) + else: +self._sample_size = self._get_sample_size_from_est_error(input_err) +self._max_est_err = input_err + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242848&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242848 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 20:49 Start Date: 15/May/19 20:49 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r28852 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import logging +import math +import sys +from builtins import round + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +try: + import mmh3 +except ImportError: + logging.info('Python version >=3.0 uses buildin hash function.') + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, **kwargs): +input_size = kwargs.pop('size', None) +input_err = kwargs.pop('error', None) + +if None not in (input_size, input_err): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (input_size, input_err)) +elif input_size is None and input_err is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif input_size is not None: + if not isinstance(input_size, int) or input_size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (input_size)) + else: +self._sample_size = input_size +self._max_est_err = None +else: + if input_err < 0.01 or input_err > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (input_err)) + else: +self._sample_size = self._get_sample_size_from_est_error(input_err) +self._max_est_err = input_err + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +
[jira] [Commented] (BEAM-7261) Add support for BasicSessionCredentials for aws credentials.
[ https://issues.apache.org/jira/browse/BEAM-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840730#comment-16840730 ] David Brown commented on BEAM-7261: --- The problem isn't the provider it is the AwsModule that serializes/deserializes the AWSCredentialsProvider, it only supports certain credentials. See: [https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java#L107-L127] > Add support for BasicSessionCredentials for aws credentials. > > > Key: BEAM-7261 > URL: https://issues.apache.org/jira/browse/BEAM-7261 > Project: Beam > Issue Type: Improvement > Components: io-java-aws >Reporter: David Brown >Assignee: Ismaël Mejía >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Currently AWS for beam only supports basic Aws Credentials with a Secret and > a Key. Need to support session tokens for s3 instances with tighter > credentials. Would involve adding BasicSessionCredentials to the AwsModule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7327) Revise BQ integration tests to clearly communicate that BQ IO expects base64-encoded bytes.
[ https://issues.apache.org/jira/browse/BEAM-7327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840700#comment-16840700 ] Valentyn Tymofieiev commented on BEAM-7327: --- As discussed in the mailing list should take another look at changes in https://github.com/apache/beam/pull/8047. > Revise BQ integration tests to clearly communicate that BQ IO expects > base64-encoded bytes. > > > Key: BEAM-7327 > URL: https://issues.apache.org/jira/browse/BEAM-7327 > Project: Beam > Issue Type: Sub-task > Components: io-python-gcp >Reporter: Valentyn Tymofieiev >Assignee: Juta Staes >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7319) Multiple NeedsRunner tests break with Flink 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840699#comment-16840699 ] Maximilian Michels commented on BEAM-7319: -- This is blocked on BEAM-6523. > Multiple NeedsRunner tests break with Flink 'Cannot union streams of > different types' > - > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests, for reference SelectTest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multiple NeedsRunner tests break with Flink 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated BEAM-7319: - Priority: Major (was: Critical) > Multiple NeedsRunner tests break with Flink 'Cannot union streams of > different types' > - > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests, for reference SelectTest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7154) Switch internal Go SDK error code to use new package
[ https://issues.apache.org/jira/browse/BEAM-7154?focusedWorklogId=242804&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242804 ] ASF GitHub Bot logged work on BEAM-7154: Author: ASF GitHub Bot Created on: 15/May/19 19:38 Start Date: 15/May/19 19:38 Worklog Time Spent: 10m Work Description: akedin commented on pull request #8560: [BEAM-7154] Updating Go SDK errors (Part 3 - Final) URL: https://github.com/apache/beam/pull/8560 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242804) Time Spent: 1h 50m (was: 1h 40m) > Switch internal Go SDK error code to use new package > > > Key: BEAM-7154 > URL: https://issues.apache.org/jira/browse/BEAM-7154 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > I added a new package for errors in the Go SDK: > [https://github.com/apache/beam/pull/8369] > This issue tracks progress on modifying existing error code, which mostly > uses fmt.Errorf, to use this new package. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multiple NeedsRunner tests break with Flink 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Priority: Critical (was: Major) > Multiple NeedsRunner tests break with Flink 'Cannot union streams of > different types' > - > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Critical > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests, for reference SelectTest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7329) Update Avro to version 1.9.0 in Python SDK
Ismaël Mejía created BEAM-7329: -- Summary: Update Avro to version 1.9.0 in Python SDK Key: BEAM-7329 URL: https://issues.apache.org/jira/browse/BEAM-7329 Project: Beam Issue Type: Improvement Components: sdk-py-core Reporter: Ismaël Mejía Avro 1.9.0 was released recently catching up 2 years of improvements and bug fixes so probably a worth upgrade for the implementations that still rely on Apache Avro (Python 2). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7328) Update Avro to version 1.9.0 in Java SDK
Ismaël Mejía created BEAM-7328: -- Summary: Update Avro to version 1.9.0 in Java SDK Key: BEAM-7328 URL: https://issues.apache.org/jira/browse/BEAM-7328 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Ismaël Mejía Assignee: Ismaël Mejía Avro 1.9.0 has nice improvements like a reduced size (1MB less) , multiple dependencies are not needed anymore (Guava, paranamer, etc) as well as cleanups in its APIs to not expose and be tight to Jackson so a worth upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7315) GatherAllPanesTest.multiplePanesMultipleReifiedPane breaks on Spark runner
[ https://issues.apache.org/jira/browse/BEAM-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7315: --- Summary: GatherAllPanesTest.multiplePanesMultipleReifiedPane breaks on Spark runner (was: .GatherAllPanesTest.multiplePanesMultipleReifiedPane breaks on Spark runner) > GatherAllPanesTest.multiplePanesMultipleReifiedPane breaks on Spark runner > -- > > Key: BEAM-7315 > URL: https://issues.apache.org/jira/browse/BEAM-7315 > Project: Beam > Issue Type: Sub-task > Components: runner-spark >Reporter: Ismaël Mejía >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multiple NeedsRunner tests break with Flink 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Summary: Multiple NeedsRunner tests break with Flink 'Cannot union streams of different types' (was: Multple NeedsRunner tests break because of 'Cannot union streams of different types') > Multiple NeedsRunner tests break with Flink 'Cannot union streams of > different types' > - > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests, for reference SelectTest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7327) Revise BQ integration tests to clearly communicate that BQ IO expects base64-encoded bytes.
[ https://issues.apache.org/jira/browse/BEAM-7327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valentyn Tymofieiev updated BEAM-7327: -- Status: Open (was: Triage Needed) > Revise BQ integration tests to clearly communicate that BQ IO expects > base64-encoded bytes. > > > Key: BEAM-7327 > URL: https://issues.apache.org/jira/browse/BEAM-7327 > Project: Beam > Issue Type: Sub-task > Components: io-python-gcp >Reporter: Valentyn Tymofieiev >Assignee: Juta Staes >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7327) Revise BQ integration tests to clearly communicate that BQ IO expects base64-encoded bytes.
Valentyn Tymofieiev created BEAM-7327: - Summary: Revise BQ integration tests to clearly communicate that BQ IO expects base64-encoded bytes. Key: BEAM-7327 URL: https://issues.apache.org/jira/browse/BEAM-7327 Project: Beam Issue Type: Sub-task Components: io-python-gcp Reporter: Valentyn Tymofieiev Assignee: Juta Staes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7326) Document that Beam BigQuery IO expects users to pass base64-encoded bytes, and BQ IO serves base64-encoded bytes to the user.
Valentyn Tymofieiev created BEAM-7326: - Summary: Document that Beam BigQuery IO expects users to pass base64-encoded bytes, and BQ IO serves base64-encoded bytes to the user. Key: BEAM-7326 URL: https://issues.apache.org/jira/browse/BEAM-7326 Project: Beam Issue Type: Bug Components: io-java-gcp, io-python-gcp Reporter: Valentyn Tymofieiev BYTES is one of the Datatypes supported by Google Cloud BigQuery, and Apache Beam BigQuery IO connector. Current implementation of BigQuery connector in Java and Python SDKs expects that users base64-encode bytes before passing them to BigQuery IO, see discussion on dev: [1] This needs to be reflected in public documentation, see [2-4] cc: [~juta] [~chamikara] [~pabloem] cc: [~rebo] [~kedin] FYI and to advise whether similar action needs to be done for Go SDK and/or Beam SQL. [1] https://lists.apache.org/thread.html/f35c836887014e059527ed1a806e730321e2f9726164a3030575f455@%3Cdev.beam.apache.org%3E [2] https://beam.apache.org/documentation/io/built-in/google-bigquery/ [3] https://beam.apache.org/releases/pydoc/2.12.0/apache_beam.io.gcp.bigquery.html [4] https://beam.apache.org/releases/javadoc/2.12.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7230) Using JdbcIO creates huge amount of connections
[ https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840666#comment-16840666 ] Brachi Packter commented on BEAM-7230: -- Hi again. Checking with the new snapshot. When I configure my own data source. it works great! thanks. I tested also with this way, (the default implementation), then I get very quick "too many connection" error... {code:java} pipeline.apply(JdbcIO.>read() .withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of( JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb", "username", "password"))) {code} With custom data source I process 50k queries with 1000 connection, and with the default data source, I process 20k queries with 4000 connection (the limit). Do you think it can be related to another configuration I set, like connection timeout, max pool size and more? Didn't check the code, But with the default implementation (above code), do we still create data source pool for each DoFn? If yes, then we should change this too, to be statically initialized per JVM. > Using JdbcIO creates huge amount of connections > --- > > Key: BEAM-7230 > URL: https://issues.apache.org/jira/browse/BEAM-7230 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.11.0 >Reporter: Brachi Packter >Assignee: Ismaël Mejía >Priority: Major > > I want to write form DataFlow to GCP cloud SQL, I'm using connection pool, > and still I see huge amount of connections in GCP SQL (4k while I set > connection pool to 300), and most of them in sleep. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?focusedWorklogId=242744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242744 ] ASF GitHub Bot logged work on BEAM-5967: Author: ASF GitHub Bot Created on: 15/May/19 18:52 Start Date: 15/May/19 18:52 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #8496: [BEAM-5967] Add handling of DynamicMessage in ProtoCoder URL: https://github.com/apache/beam/pull/8496#issuecomment-492779218 If you set `serialVersionUID` to exactly what it already is, then you keep compatibility nicely, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242744) Time Spent: 3h 10m (was: 3h) > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6693) ApproximateUnique transform for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242731 ] ASF GitHub Bot logged work on BEAM-6693: Author: ASF GitHub Bot Created on: 15/May/19 18:31 Start Date: 15/May/19 18:31 Worklog Time Spent: 10m Work Description: robinyqiu commented on pull request #8535: [BEAM-6693] ApproximateUnique transform for Python SDK URL: https://github.com/apache/beam/pull/8535#discussion_r284393523 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map.""" + +from __future__ import absolute_import +from __future__ import division + +import heapq +import logging +import math +import sys +from builtins import round + +from apache_beam.transforms.core import * +from apache_beam.transforms.ptransform import PTransform + +try: + import mmh3 +except ImportError: + logging.info('Python version >=3.0 uses buildin hash function.') + +__all__ = [ +'ApproximateUniqueGlobally', +'ApproximateUniquePerKey', +] + + +class ApproximateUniqueGlobally(PTransform): + """ + Hashes input elements and uses those to extrapolate the size of the entire + set of hash values by assuming the rest of the hash values are as densely + distributed as the sample space. + + Args: +**kwargs: Accepts a single named argument "size" or "error". +size: an int not smaller than 16, which we would use to estimate + number of unique values. +error: max estimation error, which is a float between 0.01 + and 0.50. If error is given, size will be calculated from error with + _get_sample_size_from_est_error function. + """ + + _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.' + _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \ + 'Received {size = %s, error = %s}.' + _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \ +'<= 0.50. In general, the estimation error is about ' \ +'2 / sqrt(sample_size). Received {size = %s}.' + _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \ + 'between 0.01 and 0.50. Received {error = %s}.' + + def __init__(self, **kwargs): +input_size = kwargs.pop('size', None) +input_err = kwargs.pop('error', None) + +if None not in (input_size, input_err): + raise ValueError(self._MULTI_VALUE_ERR_MSG % (input_size, input_err)) +elif input_size is None and input_err is None: + raise ValueError(self._NO_VALUE_ERR_MSG) +elif input_size is not None: + if not isinstance(input_size, int) or input_size < 16: +raise ValueError(self._INPUT_SIZE_ERR_MSG % (input_size)) + else: +self._sample_size = input_size +self._max_est_err = None +else: + if input_err < 0.01 or input_err > 0.5: +raise ValueError(self._INPUT_ERROR_ERR_MSG % (input_err)) + else: +self._sample_size = self._get_sample_size_from_est_error(input_err) +self._max_est_err = input_err + + def expand(self, pcoll): +return pcoll \ + | 'CountGlobalUniqueValues' \ + >> (CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size))) + + @staticmethod + def _get_sample_size_from_est_error(est_err): +""" +:return: sample size + +Calculate sample size from estimation error +""" +return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + + +class ApproximateUniquePerKey(ApproximateUniqueGlobally): + + def expand(self, pcoll): +return pcoll \ + | 'CountPerKeyUniqueValues' \ + >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size))) + + +class _LargestUnique(object): + """ + An object to keep samples and calculate sample hash space. It is an + accumulator of a combine function. + """ + _HASH_SPACE_SIZE = 2.0 * sys.maxsize + + def __init__(self, sample_size): +self._sample_size = sample_size +s
[jira] [Updated] (BEAM-7322) PubSubIO watermark does not advance for very low volumes
[ https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-7322: -- Description: I have identified an issue where the watermark does not advance when using the beam PubSubIO when volumes are very low. I have created a mini example project to demonstrate the behaviour with a python script for generating messages at different frequencies: https://github.com/tims/beam/tree/master/pubsub-watermark [note: this is in a directory of a Beam fork for corp hoop jumping convenience on my end, it is not intended for merging]. The behaviour is easily replicated if you apply a fixed window triggering after the watermark passes the end of the window. {code} pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) .apply(ParDo.of(new ParseScoreEventFn())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.standardSeconds(60)) .discardingFiredPanes()) .apply(MapElements.into(kvs(strings(), integers())) .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore( .apply(Count.perKey()) .apply(ParDo.of(Log.of("counted per key"))); {code} With this triggering, using both the flink local runner the direct runner, panes will be fired after a long delay (minutes) for low frequencies of messages in pubsub (seconds). The biggest issue is that it seems no panes will ever be emitted if you just send a few events and stop. This is particularly likely trip up people new to Beam. If I change the triggering to have early firings I get exactly the emitted panes that you would expect. {code} .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .alignedTo(Duration.standardSeconds(60 .withAllowedLateness(Duration.standardSeconds(60)) .discardingFiredPanes()) {code} I can use any variation of early firing triggers and they work as expected. We believe that the watermark is not advancing when the volume is too low because of the sampling that PubSubIO does to determine it's watermark. It just never has a large enough sample. This problem occurs in the direct runner and flink runner, but not in the dataflow runner (because dataflow uses it's own PubSubIO because dataflow has access to internal details of pubsub and so doesn't need to do any sampling). For extra context from the user@ list: *Kenneth Knowles:* Thanks to your info, I think it is the configuration of MovingFunction [1] that is the likely culprit, but I don't totally understand why. It is configured like so: - store 60 seconds of data - update data every 5 seconds - require at least 10 messages to be 'significant' - require messages from at least 2 distinct 5 second update periods to 'significant' I would expect a rate of 1 message per second to satisfy this. I may have read something wrong. Have you filed an issue in Jira [2]? Kenn [1] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 [2] https://issues.apache.org/jira/projects/BEAM/issues *Alexey Romanenko:* Not sure that this can be very helpful but I recall a similar issue with KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. [1] https://issues.apache.org/jira/browse/BEAM-5063 [2] https://github.com/apache/beam/pull/6178 was: I have identified an issue where the watermark does not advance when using the beam PubSubIO when volumes are very low. I have created a mini example project to demonstrate the behaviour with a python script for generating messages at different frequencies: https://github.com/tims/beam/tree/master/pubsub-watermark [note: this is in a directory of a Beam fork for corp hoop jumping convenience on my end, it is not intended for merging]. The behaviour is easily replicated if you apply a fixed window triggering after the watermark passes the end of the window. pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) .apply(ParDo.of(new ParseScoreEventFn())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.standardSeconds(60)) .discardingFiredPanes()) .apply(MapElements.into(kvs(strings(), integers())) .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), scoreEvent.getScore( .apply(Count.perKey()) .apply(ParDo.of(Log.of("counted per key"))); With this triggering, using both the flink local runner the direct runner, panes will be fired after a long delay (minutes) for low frequencies of messages in pubsub (s
[jira] [Work logged] (BEAM-6138) Add User Metric Support to Java SDK
[ https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=242718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242718 ] ASF GitHub Bot logged work on BEAM-6138: Author: ASF GitHub Bot Created on: 15/May/19 18:13 Start Date: 15/May/19 18:13 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #8280: [BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN API URL: https://github.com/apache/beam/pull/8280 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242718) Time Spent: 14h 20m (was: 14h 10m) > Add User Metric Support to Java SDK > --- > > Key: BEAM-6138 > URL: https://issues.apache.org/jira/browse/BEAM-6138 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Fix For: 3.0.0 > > Time Spent: 14h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242714&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242714 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 18:02 Start Date: 15/May/19 18:02 Worklog Time Spent: 10m Work Description: rosetn commented on issue #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#issuecomment-492760380 Updated the name standardization This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242714) Time Spent: 4h (was: 3h 50m) Remaining Estimate: 164h (was: 164h 10m) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 4h > Remaining Estimate: 164h > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242713&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242713 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 18:02 Start Date: 15/May/19 18:02 Worklog Time Spent: 10m Work Description: rosetn commented on issue #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#issuecomment-492760380 Updated the name standardizations This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242713) Time Spent: 3h 50m (was: 3h 40m) Remaining Estimate: 164h 10m (was: 164h 20m) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 3h 50m > Remaining Estimate: 164h 10m > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner
[ https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=242711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242711 ] ASF GitHub Bot logged work on BEAM-7305: Author: ASF GitHub Bot Created on: 15/May/19 17:57 Start Date: 15/May/19 17:57 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #8410: [BEAM-7305] Add first version of Hazelcast Jet based Java Runner URL: https://github.com/apache/beam/pull/8410#discussion_r284379886 ## File path: runners/jet-experimental/build.gradle ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file Review comment: @adude3141 should there be some publishing config here to get the artifactId set up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242711) Time Spent: 50m (was: 40m) > Add first version of Hazelcast Jet Runner > - > > Key: BEAM-7305 > URL: https://issues.apache.org/jira/browse/BEAM-7305 > Project: Beam > Issue Type: New Feature > Components: runner-jet >Reporter: Maximilian Michels >Assignee: Jozsef Bartok >Priority: Major > Fix For: 2.14.0 > > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner
[ https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=242709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242709 ] ASF GitHub Bot logged work on BEAM-7305: Author: ASF GitHub Bot Created on: 15/May/19 17:56 Start Date: 15/May/19 17:56 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #8410: [BEAM-7305] Add first version of Hazelcast Jet based Java Runner URL: https://github.com/apache/beam/pull/8410#discussion_r284379323 ## File path: runners/jet-experimental/build.gradle ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: Runners :: Hazelcast Jet" + +evaluationDependsOn(":sdks:java:core") +evaluationDependsOn(":runners:core-java") + +project.ext { +jet_version = '3.0' +hazelcast_version = '3.12' +} + +configurations { +validatesRunner +} + +dependencies { +shadow project(path: ":sdks:java:core", configuration: "shadow") +shadow project(path: ":runners:core-java", configuration: "shadow") +shadow "com.hazelcast.jet:hazelcast-jet:$jet_version" + +shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") +shadowTest project(path: ":runners:core-java", configuration: "shadowTest") +shadowTest library.java.hamcrest_core +shadowTest library.java.junit +shadowTest "com.hazelcast.jet:hazelcast-jet-core:$jet_version:tests" +shadowTest "com.hazelcast:hazelcast:$hazelcast_version:tests" +shadowTest "com.hazelcast:hazelcast-client:$hazelcast_version:tests" + +validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") +validatesRunner project(path: ":runners:core-java", configuration: "shadowTest") +validatesRunner project(path: project.path, configuration: "shadowTest") +} + +task validatesRunnerBatch(type: Test) { +group = "Verification" +systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ +"--runner=TestJetRunner", +"--jetGroupName=jet", +"--jetLocalParallelism=2" +]) + +classpath = configurations.validatesRunner +testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +useJUnit { +includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' +excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used +exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner +} + +maxHeapSize = '4g' +} + +task validatesRunner { +group = "Verification" +description "Validates Jet runner" +dependsOn validatesRunnerBatch +} + +spotless { +java { +paddedCell() Review comment: Ah I just noticed this. Yes, we have hit bugs that slowed the whole community down because of paddedCell. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242709) Time Spent: 40m (was: 0.5h) > Add first version of Hazelcast Jet Runner > - > > Key: BEAM-7305 > URL: https://issues.apache.org/jira/browse/BEAM-7305 > Project: Beam > Issue Type: New Feature > Components: runner-jet >Reporter: Maximilian Michels >Assignee: Jozsef Bartok >Priority: Major > Fix For: 2.14.0 > > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6988) TypeHints Py3 Error: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+
[ https://issues.apache.org/jira/browse/BEAM-6988?focusedWorklogId=242708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242708 ] ASF GitHub Bot logged work on BEAM-6988: Author: ASF GitHub Bot Created on: 15/May/19 17:42 Start Date: 15/May/19 17:42 Worklog Time Spent: 10m Work Description: udim commented on issue #8530: [BEAM-6988] solved problem related to updates of the str object URL: https://github.com/apache/beam/pull/8530#issuecomment-492753407 `fn.__objclass__` is a partial type hint for some builtin methods, since it doesn't provide type information to all possible arguments. In Py3.7: ```py >>> inspect.signature(str.isupper).bind('a') # okay >>> inspect.signature(str.strip).bind('a') # missing optional arg chars >>> inspect.signature(str.join).bind('a') # missing required arg iterable TypeError: missing a required argument: 'iterable' ``` We can avoid the TypeError above by using `bind_partial()`, but the type information for arguments beyond `self` will be missing from all builtins. Hence, these are partial type hints. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242708) Time Spent: 3h 10m (was: 3h) > TypeHints Py3 Error: test_non_function > (apache_beam.typehints.typed_pipeline_test.MainInputTest) Fails on Python 3.7+ > - > > Key: BEAM-6988 > URL: https://issues.apache.org/jira/browse/BEAM-6988 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: niklas Hansson >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 53, in test_non_function > result = ['xa', 'bbx', 'xcx'] | beam.Map(str.strip, 'x') > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 510, in _ror_ > result = p.apply(self, pvalueish, label) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/pipeline.py", > line 514, in apply > transform.type_check_inputs(pvalueish) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/ptransform.py", > line 753, in type_check_inputs > hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) > File > "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/typehints/decorators.py", > line 283, in getcallargs_forhints > raise TypeCheckError(e) > apache_beam.typehints.decorators.TypeCheckError: strip() missing 1 required > positional argument: 'chars'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242705&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242705 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 17:41 Start Date: 15/May/19 17:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on pull request #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#discussion_r284372674 ## File path: .test-infra/jenkins/job_PerformanceTests_Python.groovy ## @@ -18,46 +18,107 @@ import CommonJobProperties as commonJobProperties -// This job runs the Beam Python performance tests on PerfKit Benchmarker. -job('beam_PerformanceTests_Python'){ - // Set default Beam job properties. - commonJobProperties.setTopLevelMainJobProperties(delegate) - - // Run job in postcommit every 6 hours, don't trigger every push. - commonJobProperties.setAutoJob( - delegate, - 'H */6 * * *') - - // Allows triggering this build against pull requests. - commonJobProperties.enablePhraseTriggeringFromPullRequest( - delegate, - 'Python SDK Performance Test', - 'Run Python Performance Test') - - def pipelineArgs = [ - project: 'apache-beam-testing', - staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it', - temp_location: 'gs://temp-storage-for-end-to-end-tests/temp-it', - output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output' - ] - def pipelineArgList = [] - pipelineArgs.each({ -key, value -> pipelineArgList.add("--$key=$value") - }) - def pipelineArgsJoined = pipelineArgList.join(',') - - def argMap = [ - beam_sdk : 'python', - benchmarks : 'beam_integration_benchmark', - bigquery_table : 'beam_performance.wordcount_py_pkb_results', - beam_it_class: 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', - beam_it_module : 'sdks/python', - beam_prebuilt: 'true', // skip beam prebuild - beam_python_sdk_location : 'build/apache-beam.tar.gz', - beam_runner : 'TestDataflowRunner', - beam_it_timeout : '1200', - beam_it_args : pipelineArgsJoined, - ] - - commonJobProperties.buildPerformanceTest(delegate, argMap) + +class PerformanceTestConfigurations { + String jobName + String jobDescription + String jobTriggerPhrase + String buildSchedule = 'H */6 * * *' // every 6 hours + String benchmarkName = 'beam_integration_benchmark' + String sdk = 'python' + String bigqueryTable + String itClass + String itModule + Boolean skipPrebuild = false + String pythonSdkLocation + String runner = 'TestDataflowRunner' + Integer itTimeout = 1200 + Map extraPipelineArgs +} + +// Common pipeline args for Dataflow job. +def dataflowPipelineArgs = [ +project : 'apache-beam-testing', +staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it', +temp_location : 'gs://temp-storage-for-end-to-end-tests/temp-it', +] + + +// Configurations of each Jenkins job. +def testConfigurations = [ +new PerformanceTestConfigurations( +jobName : 'beam_PerformanceTests_Python', +jobDescription: 'Python SDK Performance Test', +jobTriggerPhrase : 'Run Python Performance Test', +bigqueryTable : 'beam_performance.wordcount_py_pkb_results', +skipPrebuild : true, +pythonSdkLocation : 'build/apache-beam.tar.gz', +itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', +itModule : 'sdks/python', +extraPipelineArgs : dataflowPipelineArgs + [ +output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output' +], +), +new PerformanceTestConfigurations( +jobName : 'beam_PerformanceTests_Python35', +jobDescription: 'Python35 SDK Performance Test', +jobTriggerPhrase : 'Run Python35 Performance Test', +bigqueryTable : 'beam_performance.wordcount_py35_pkb_results', +skipPrebuild : true, +pythonSdkLocation : 'test-suites/dataflow/py35/build/apache-beam.tar.gz', +itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', Review comment: sg. done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242705) > Add
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242706 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 17:41 Start Date: 15/May/19 17:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on pull request #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#discussion_r284372701 ## File path: .test-infra/jenkins/job_PerformanceTests_Python.groovy ## @@ -110,12 +106,12 @@ private void createPythonPerformanceTestJob(PerformanceTestConfigurations testCo testConfig.jobTriggerPhrase) def argMap = [ -beam_sdk: testConfig.sdk, +beam_sdk: 'python', benchmarks : testConfig.benchmarkName, bigquery_table : testConfig.resultTable, beam_it_class : testConfig.itClass, beam_it_module : testConfig.itModule, -beam_prebuilt : testConfig.prebuilt.toString(), +beam_prebuilt : 'true', Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242706) Time Spent: 13h 10m (was: 13h) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13h 10m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242703 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 17:41 Start Date: 15/May/19 17:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on pull request #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#discussion_r284022805 ## File path: .test-infra/jenkins/job_PerformanceTests_Python.groovy ## @@ -18,46 +18,107 @@ import CommonJobProperties as commonJobProperties -// This job runs the Beam Python performance tests on PerfKit Benchmarker. -job('beam_PerformanceTests_Python'){ - // Set default Beam job properties. - commonJobProperties.setTopLevelMainJobProperties(delegate) - - // Run job in postcommit every 6 hours, don't trigger every push. - commonJobProperties.setAutoJob( - delegate, - 'H */6 * * *') - - // Allows triggering this build against pull requests. - commonJobProperties.enablePhraseTriggeringFromPullRequest( - delegate, - 'Python SDK Performance Test', - 'Run Python Performance Test') - - def pipelineArgs = [ - project: 'apache-beam-testing', - staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it', - temp_location: 'gs://temp-storage-for-end-to-end-tests/temp-it', - output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output' - ] - def pipelineArgList = [] - pipelineArgs.each({ -key, value -> pipelineArgList.add("--$key=$value") - }) - def pipelineArgsJoined = pipelineArgList.join(',') - - def argMap = [ - beam_sdk : 'python', - benchmarks : 'beam_integration_benchmark', - bigquery_table : 'beam_performance.wordcount_py_pkb_results', - beam_it_class: 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', - beam_it_module : 'sdks/python', - beam_prebuilt: 'true', // skip beam prebuild - beam_python_sdk_location : 'build/apache-beam.tar.gz', - beam_runner : 'TestDataflowRunner', - beam_it_timeout : '1200', - beam_it_args : pipelineArgsJoined, - ] - - commonJobProperties.buildPerformanceTest(delegate, argMap) + +class PerformanceTestConfigurations { + String jobName + String jobDescription + String jobTriggerPhrase + String buildSchedule = 'H */6 * * *' // every 6 hours + String benchmarkName = 'beam_integration_benchmark' + String sdk = 'python' + String bigqueryTable + String itClass + String itModule Review comment: I think people should know how Perfkit `beam_integration_benchmark` works before configuring in Jenkins. Probably we need better document for that, and also happy to sync with you offline for more details. For you question, `beam_integration_benchmark` uses Gradle task [`integrationTest`](https://github.com/GoogleCloudPlatform/PerfKitBenchmarker/blob/master/perfkitbenchmarker/beam_benchmark_helper.py#L226) which can be enabled through [`enablePythonPerformanceTest`](https://github.com/apache/beam/blob/master/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L1773). So `beam_it_module` is the Gradle project where `integrationTest` located. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242703) Time Spent: 12h 50m (was: 12h 40m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 12h 50m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242704&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242704 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 17:41 Start Date: 15/May/19 17:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on pull request #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#discussion_r284023084 ## File path: .test-infra/jenkins/job_PerformanceTests_Python.groovy ## @@ -28,25 +28,23 @@ class PerformanceTestConfigurations { String jobTriggerPhrase // Frequency of the job build, default to every 6 hours String buildSchedule = 'H */6 * * *' - // A benchmark flag, will pass to "--benchmarkName" + // A benchmark defined flag, will pass to benchmark as "--benchmarkName" String benchmarkName = 'beam_integration_benchmark' - // A benchmark flag, will pass to "--beam_sdk" - String sdk = 'python' - // A benchmark flag, will pass to "--bigqueryTable" + // A benchmark defined flag, will pass to benchmark as "--bigqueryTable" String resultTable - // A benchmark flag, will pass to "--beam_it_class" + // A benchmark defined flag, will pass to benchmark as "--beam_it_class" String itClass - // A benchmark flag, will pass to "--beam_it_module" + // A benchmark defined flag, will pass to benchmark as "--beam_it_module" String itModule - // A benchmark flag, will pass to "--beam_prebuilt" - Boolean prebuilt = false - // A benchmark flag, will pass to "--beam_python_sdk_location" + // A benchmark defined flag, will pass to benchmark as "--beam_python_sdk_location" + // If not provided, benchmark will search through the project recursively + // or fails immediately. Review comment: The search behavior is defined [inside benchmark](https://github.com/GoogleCloudPlatform/PerfKitBenchmarker/blob/master/perfkitbenchmarker/beam_benchmark_helper.py#L290) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242704) Time Spent: 13h (was: 12h 50m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13h > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6908) Add Python3 performance benchmarks
[ https://issues.apache.org/jira/browse/BEAM-6908?focusedWorklogId=242707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242707 ] ASF GitHub Bot logged work on BEAM-6908: Author: ASF GitHub Bot Created on: 15/May/19 17:41 Start Date: 15/May/19 17:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #8518: [BEAM-6908] Refactor Python performance test groovy file for easy configuration URL: https://github.com/apache/beam/pull/8518#issuecomment-492752950 PTAL @tvalentyn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242707) Time Spent: 13h 20m (was: 13h 10m) > Add Python3 performance benchmarks > -- > > Key: BEAM-6908 > URL: https://issues.apache.org/jira/browse/BEAM-6908 > Project: Beam > Issue Type: Sub-task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 13h 20m > Remaining Estimate: 0h > > Similar to > [beam_PerformanceTests_Python|https://builds.apache.org/view/A-D/view/Beam/view/PerformanceTests/job/beam_PerformanceTests_Python/], > we want to have a Python3 benchmark running on Jenkins to detect performance > regression during code adoption. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7131) Spark portable runner appears to be repeating work (in TFX example)
[ https://issues.apache.org/jira/browse/BEAM-7131?focusedWorklogId=242696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242696 ] ASF GitHub Bot logged work on BEAM-7131: Author: ASF GitHub Bot Created on: 15/May/19 17:28 Start Date: 15/May/19 17:28 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #8558: [BEAM-7131] Spark: cache executable stage output to prevent re-computation URL: https://github.com/apache/beam/pull/8558#discussion_r284367565 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java ## @@ -224,6 +224,11 @@ private static void translateImpulse( MetricsAccumulator.getInstance()); JavaRDD staged = inputRdd.mapPartitions(function); +// Prevent potentially expensive re-computation of executable stage +if (outputs.size() > 1) { + staged.cache(); Review comment: Flattening the output RDDs (or "leaves") seems to have no effect on re-computation. As far as I understand, Spark caches nothing unless explicitly commanded by the user. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242696) Time Spent: 3h (was: 2h 50m) > Spark portable runner appears to be repeating work (in TFX example) > --- > > Key: BEAM-7131 > URL: https://issues.apache.org/jira/browse/BEAM-7131 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > I've been trying to run the TFX Chicago taxi example [1] on the Spark > portable runner. TFDV works fine, but the preprocess step > (preprocess_flink.sh [2]) fails with the following error: > RuntimeError: AlreadyExistsError: file already exists [while running > 'WriteTransformFn/WriteTransformFn'] > Assets are being written multiple times to different temp directories, which > is okay, but the error occurs when they are copied to the same permanent > output directory. Specifically, the copy tree operation in transform_fn_io.py > [3] is run twice with the same output directory. The error doesn't occur when > that code is modified to allow overwriting existing files, but that's only a > shallow fix. While the TF transform should probably be made idempotent, this > is also an issue with the Spark runner, which shouldn't be repeating work > like this regularly (in the absence of a failure condition). > [1] [https://github.com/tensorflow/tfx/tree/master/tfx/examples/chicago_taxi] > [2] > [https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi/preprocess_flink.sh] > [3] > [https://github.com/tensorflow/transform/blob/master/tensorflow_transform/beam/tft_beam_io/transform_fn_io.py#L33-L45] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242687 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 17:19 Start Date: 15/May/19 17:19 Worklog Time Spent: 10m Work Description: melap commented on pull request #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#discussion_r284361380 ## File path: website/src/documentation/dsls/sql/calcite/aggregate-functions.md ## @@ -18,11 +19,13 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Beam SQL: Aggregate functions +# Beam SQL aggregate functions for Calcite Review comment: can we standardize on a name format for all of the pages in a given section? (except the overview - made a suggestion in another comment) Beam SQL aggregate functions (Calcite) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242687) Time Spent: 3.5h (was: 3h 20m) Remaining Estimate: 164.5h (was: 164h 40m) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 3.5h > Remaining Estimate: 164.5h > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242689 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 17:19 Start Date: 15/May/19 17:19 Worklog Time Spent: 10m Work Description: melap commented on pull request #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#discussion_r284361843 ## File path: website/src/_includes/section-menu/sdks.html ## @@ -60,19 +60,26 @@ Walkthrough Shell - SQL Reference + Apache Calcite dialect -Data types -Lexical structure -CREATE EXTERNAL TABLE -SELECT -Windowing & Triggering -Joins -Scalar functions -Aggregate functions -User-defined functions -SET Pipeline Options +Overview Review comment: what about "Support overview" or "Calcite support overview"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242689) Time Spent: 3h 40m (was: 3.5h) Remaining Estimate: 164h 20m (was: 164.5h) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 3h 40m > Remaining Estimate: 164h 20m > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242686 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 17:19 Start Date: 15/May/19 17:19 Worklog Time Spent: 10m Work Description: melap commented on pull request #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#discussion_r284361719 ## File path: website/src/documentation/dsls/sql/calcite/overview.md ## @@ -0,0 +1,88 @@ +--- +layout: section +title: "Beam SQL in Calcite: Overview" +section_menu: section-menu/sdks.html +permalink: /documentation/dsls/sql/calcite/overview/ +--- + +# Beam SQL Overview (Calcite) Review comment: perhaps this, to be consistent with the overall overview page title? Beam SQL: Calcite support overview This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242686) Time Spent: 3h 20m (was: 3h 10m) Remaining Estimate: 164h 40m (was: 164h 50m) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 3h 20m > Remaining Estimate: 164h 40m > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6916) Reorganize Beam SQL docs
[ https://issues.apache.org/jira/browse/BEAM-6916?focusedWorklogId=242688&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242688 ] ASF GitHub Bot logged work on BEAM-6916: Author: ASF GitHub Bot Created on: 15/May/19 17:19 Start Date: 15/May/19 17:19 Worklog Time Spent: 10m Work Description: melap commented on pull request #8455: [BEAM-6916] Reorg Beam SQL docs and add Calcite section URL: https://github.com/apache/beam/pull/8455#discussion_r284362241 ## File path: website/src/documentation/dsls/sql/extensions/windowing-and-triggering.md ## @@ -18,7 +19,7 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Beam SQL: Windowing and triggering +# Beam SQL extensions: Windowing and triggering Review comment: similarly, not all pages in the Beam SQL extensions section have this prefix on the page title This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242688) Time Spent: 3h 40m (was: 3.5h) Remaining Estimate: 164h 20m (was: 164.5h) > Reorganize Beam SQL docs > > > Key: BEAM-6916 > URL: https://issues.apache.org/jira/browse/BEAM-6916 > Project: Beam > Issue Type: New Feature > Components: website >Reporter: Rose Nguyen >Assignee: Rose Nguyen >Priority: Minor > Original Estimate: 168h > Time Spent: 3h 40m > Remaining Estimate: 164h 20m > > This page describes the Calcite SQL dialect supported by Beam SQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7103) Adding AvroGenericCoder for simple dict type cross-language data transfer
[ https://issues.apache.org/jira/browse/BEAM-7103?focusedWorklogId=242644&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242644 ] ASF GitHub Bot logged work on BEAM-7103: Author: ASF GitHub Bot Created on: 15/May/19 16:36 Start Date: 15/May/19 16:36 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #8342: [BEAM-7103] Adding AvroCoderTranslator for cross-language data transfer URL: https://github.com/apache/beam/pull/8342 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242644) Time Spent: 2h 40m (was: 2.5h) > Adding AvroGenericCoder for simple dict type cross-language data transfer > - > > Key: BEAM-7103 > URL: https://issues.apache.org/jira/browse/BEAM-7103 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Adding AvroGenericCoder for simple dict type cross-language data transfer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7103) Adding AvroGenericCoder for simple dict type cross-language data transfer
[ https://issues.apache.org/jira/browse/BEAM-7103?focusedWorklogId=242643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242643 ] ASF GitHub Bot logged work on BEAM-7103: Author: ASF GitHub Bot Created on: 15/May/19 16:36 Start Date: 15/May/19 16:36 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #8342: [BEAM-7103] Adding AvroCoderTranslator for cross-language data transfer URL: https://github.com/apache/beam/pull/8342#issuecomment-492729524 Thanks. LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242643) Time Spent: 2.5h (was: 2h 20m) > Adding AvroGenericCoder for simple dict type cross-language data transfer > - > > Key: BEAM-7103 > URL: https://issues.apache.org/jira/browse/BEAM-7103 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Adding AvroGenericCoder for simple dict type cross-language data transfer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-7311) merge internal commits to beam open source trunk to prepare for the security patch
[ https://issues.apache.org/jira/browse/BEAM-7311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Lu resolved BEAM-7311. -- Resolution: Fixed Fix Version/s: 2.13.0 > merge internal commits to beam open source trunk to prepare for the security > patch > -- > > Key: BEAM-7311 > URL: https://issues.apache.org/jira/browse/BEAM-7311 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Fix For: 2.13.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Merge the following commits: > * add portable pipeline option and use that for job server driver > * minor refactor in server driver to allow potential code reuse > * miscellaneous fix on samza runne > ** pipeline life cycle listent to add pipeline optino in onInit > ** portable runner to support samza metrics reporter > ** add timeout for pipeline cancelation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7311) merge internal commits to beam open source trunk to prepare for the security patch
[ https://issues.apache.org/jira/browse/BEAM-7311?focusedWorklogId=242633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242633 ] ASF GitHub Bot logged work on BEAM-7311: Author: ASF GitHub Bot Created on: 15/May/19 16:28 Start Date: 15/May/19 16:28 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #8582: [BEAM-7311] merge internal commits to beam open source trunk to prepare for the security patch URL: https://github.com/apache/beam/pull/8582 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242633) Time Spent: 0.5h (was: 20m) > merge internal commits to beam open source trunk to prepare for the security > patch > -- > > Key: BEAM-7311 > URL: https://issues.apache.org/jira/browse/BEAM-7311 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Merge the following commits: > * add portable pipeline option and use that for job server driver > * minor refactor in server driver to allow potential code reuse > * miscellaneous fix on samza runne > ** pipeline life cycle listent to add pipeline optino in onInit > ** portable runner to support samza metrics reporter > ** add timeout for pipeline cancelation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-7305) Add first version of Hazelcast Jet Runner
[ https://issues.apache.org/jira/browse/BEAM-7305?focusedWorklogId=242631&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242631 ] ASF GitHub Bot logged work on BEAM-7305: Author: ASF GitHub Bot Created on: 15/May/19 16:25 Start Date: 15/May/19 16:25 Worklog Time Spent: 10m Work Description: adude3141 commented on pull request #8410: [BEAM-7305] Add first version of Hazelcast Jet based Java Runner URL: https://github.com/apache/beam/pull/8410#discussion_r284342494 ## File path: runners/jet-experimental/build.gradle ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonOutput + +plugins { id 'org.apache.beam.module' } +applyJavaNature() + +description = "Apache Beam :: Runners :: Hazelcast Jet" + +evaluationDependsOn(":sdks:java:core") +evaluationDependsOn(":runners:core-java") + +project.ext { +jet_version = '3.0' +hazelcast_version = '3.12' +} + +configurations { +validatesRunner +} + +dependencies { +shadow project(path: ":sdks:java:core", configuration: "shadow") +shadow project(path: ":runners:core-java", configuration: "shadow") +shadow "com.hazelcast.jet:hazelcast-jet:$jet_version" + +shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") +shadowTest project(path: ":runners:core-java", configuration: "shadowTest") +shadowTest library.java.hamcrest_core +shadowTest library.java.junit +shadowTest "com.hazelcast.jet:hazelcast-jet-core:$jet_version:tests" +shadowTest "com.hazelcast:hazelcast:$hazelcast_version:tests" +shadowTest "com.hazelcast:hazelcast-client:$hazelcast_version:tests" + +validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") +validatesRunner project(path: ":runners:core-java", configuration: "shadowTest") +validatesRunner project(path: project.path, configuration: "shadowTest") +} + +task validatesRunnerBatch(type: Test) { +group = "Verification" +systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ +"--runner=TestJetRunner", +"--jetGroupName=jet", +"--jetLocalParallelism=2" +]) + +classpath = configurations.validatesRunner +testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +useJUnit { +includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' +excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used +exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner +} + +maxHeapSize = '4g' +} + +task validatesRunner { +group = "Verification" +description "Validates Jet runner" +dependsOn validatesRunnerBatch +} + +spotless { +java { +paddedCell() Review comment: Could you elaborate on why this is needed? Currently, `spotlessJavaCheck`passes, but `spotlessApply`resolve some issues in codebase. Removing `paddedCell` will make `spotlessJavaCheck` fail. So I am wondering, whether we could just remove this configuration? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 242631) Time Spent: 0.5h (was: 20m) > Add first version of Hazelcast Jet Runner > - > > Key: BEAM-7305 > URL: https://issues.apache.org/jira/browse/BEAM-7305 > Project: Beam > Issue Type: New Feature > Components: runner-jet >Reporter: Maximilian Michels >Assignee: Jozsef Bartok >Priority: Major > Fix For: 2.14.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > -- This m
[jira] [Updated] (BEAM-7323) PartitionTest.testDroppedPartition breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7323: --- Status: Open (was: Triage Needed) > PartitionTest.testDroppedPartition breaks on Flink runner > - > > Key: BEAM-7323 > URL: https://issues.apache.org/jira/browse/BEAM-7323 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens too in GroupByKeyTest$WindowTests.testIdentityWindowFnPropagation > {code} > java.lang.AssertionError > : > Flatten.PCollections.out: > Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any > order > but: not matched: <2> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7320) TextIOWriteTest.testWriteViaSink breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7320: --- Status: Open (was: Triage Needed) > TextIOWriteTest.testWriteViaSink breaks on Flink runner > --- > > Key: BEAM-7320 > URL: https://issues.apache.org/jira/browse/BEAM-7320 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Minor > > {code} > java.lang.AssertionError > : > Read All/TextIO.ReadFiles/Read all via FileBasedSource/Read > ranges/ParMultiDo(ReadFileRanges).output: > Expected: iterable with items ["a", "b", "c", "d", "e", "f"] in any order > but: no item matches: "a", "c" in ["b", "e", "d", "f"] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7318) WriteFilesTest.testEmptyWrite breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7318: --- Status: Open (was: Triage Needed) > WriteFilesTest.testEmptyWrite breaks on Flink runner > > > Key: BEAM-7318 > URL: https://issues.apache.org/jira/browse/BEAM-7318 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Minor > > )java.lang.AssertionError: expected:<1> but was:<4> -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7321) TestStreamTest breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7321: --- Status: Open (was: Triage Needed) > TestStreamTest breaks on Flink runner > - > > Key: BEAM-7321 > URL: https://issues.apache.org/jira/browse/BEAM-7321 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > {code} > java.lang.AssertionError > : > Flatten.Iterables/FlattenIterables/FlatMap/ParMultiDo(Anonymous).output: > Expected: iterable with items [<1>, <2>, <3>, <4>, <5>] in any order > but: not matched: <-2> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7325) PipelineTest.testEmptyPipeline breaks on Flink runner
Ismaël Mejía created BEAM-7325: -- Summary: PipelineTest.testEmptyPipeline breaks on Flink runner Key: BEAM-7325 URL: https://issues.apache.org/jira/browse/BEAM-7325 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ismaël Mejía {code} org.apache.beam.sdk.Pipeline$PipelineExecutionException : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. Open stacktrace Caused by: java.lang.IllegalStateException : No operators defined in streaming topology. Cannot execute. Close stacktrace at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1534) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:141) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) }code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7323) PartitionTest.testDroppedPartition breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7323: --- Description: This happens too in GroupByKeyTest$WindowTests.testIdentityWindowFnPropagation {code} java.lang.AssertionError : Flatten.PCollections.out: Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any order but: not matched: <2> {code} was: {code} java.lang.AssertionError : Flatten.PCollections.out: Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any order but: not matched: <2> {code} > PartitionTest.testDroppedPartition breaks on Flink runner > - > > Key: BEAM-7323 > URL: https://issues.apache.org/jira/browse/BEAM-7323 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens too in GroupByKeyTest$WindowTests.testIdentityWindowFnPropagation > {code} > java.lang.AssertionError > : > Flatten.PCollections.out: > Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any > order > but: not matched: <2> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7324) GroupByKeyTest$WindowTests.testWindowFnInvalidation breaks on Flink runner in batch mode
Ismaël Mejía created BEAM-7324: -- Summary: GroupByKeyTest$WindowTests.testWindowFnInvalidation breaks on Flink runner in batch mode Key: BEAM-7324 URL: https://issues.apache.org/jira/browse/BEAM-7324 Project: Beam Issue Type: Bug Components: runner-flink Reporter: Ismaël Mejía {code} java.lang.NullPointerException : (No message provided) Close stacktrace at org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asMetricResults(MetricsContainerStepMap.java:129) at org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults(MetricsContainerStepMap.java:113) at org.apache.beam.runners.flink.FlinkRunnerResult.metrics(FlinkRunnerResult.java:76) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multple NeedsRunner tests break because of 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Description: This happens in AvroIOTest for reference.This also breaks most of the Schema related tests, for reference SelectTest. (was: This happens in AvroIOTest for reference.This also breaks most of the Schema related tests.) > Multple NeedsRunner tests break because of 'Cannot union streams of different > types' > > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests, for reference SelectTest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multple NeedsRunner tests break because of 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Description: This happens in AvroIOTest for reference.This also breaks most of the Schema related tests. (was: This happens in AvroIOTest for reference.) > Multple NeedsRunner tests break because of 'Cannot union streams of different > types' > > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Minor > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multple NeedsRunner tests break because of 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Priority: Major (was: Minor) > Multple NeedsRunner tests break because of 'Cannot union streams of different > types' > > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7319) Multple NeedsRunner tests break because of 'Cannot union streams of different types'
[ https://issues.apache.org/jira/browse/BEAM-7319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7319: --- Status: Open (was: Triage Needed) > Multple NeedsRunner tests break because of 'Cannot union streams of different > types' > > > Key: BEAM-7319 > URL: https://issues.apache.org/jira/browse/BEAM-7319 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > This happens in AvroIOTest for reference.This also breaks most of the Schema > related tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6829) Duplicate metric warnings clutter log
[ https://issues.apache.org/jira/browse/BEAM-6829?focusedWorklogId=242615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242615 ] ASF GitHub Bot logged work on BEAM-6829: Author: ASF GitHub Bot Created on: 15/May/19 16:01 Start Date: 15/May/19 16:01 Worklog Time Spent: 10m Work Description: mxm commented on pull request #8585: [BEAM-6829] Use transform name for metric namespace if none provided URL: https://github.com/apache/beam/pull/8585 In non-portable pipelines, transforms are always scoped by an operator. Metric names do not have to be unique across transforms. In portable pipelines, duplicate metric names inevitably occur if no unique namespace has been set, due to multiple transforms being fused together inside a single operator. This avoids duplicate metric names by using the transform name as the namespace, if none has been provided. It is still possible to provide a custom namespace. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/) | --- | --- | --- Pre-Commit Tests Status (on master branch) --- |Java | Python | Go | Website --- | --- | --- | --- | --- Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Pyt
[jira] [Created] (BEAM-7323) PartitionTest.testDroppedPartition
Ismaël Mejía created BEAM-7323: -- Summary: PartitionTest.testDroppedPartition Key: BEAM-7323 URL: https://issues.apache.org/jira/browse/BEAM-7323 Project: Beam Issue Type: Sub-task Components: runner-flink Reporter: Ismaël Mejía {code} java.lang.AssertionError : Flatten.PCollections.out: Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any order but: not matched: <2> {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7323) PartitionTest.testDroppedPartition breaks on Flink runner
[ https://issues.apache.org/jira/browse/BEAM-7323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7323: --- Summary: PartitionTest.testDroppedPartition breaks on Flink runner (was: PartitionTest.testDroppedPartition) > PartitionTest.testDroppedPartition breaks on Flink runner > - > > Key: BEAM-7323 > URL: https://issues.apache.org/jira/browse/BEAM-7323 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ismaël Mejía >Priority: Major > > {code} > java.lang.AssertionError > : > Flatten.PCollections.out: > Expected: iterable with items [<2>, <4>, <5>, <7>, <8>, <10>, <11>] in any > order > but: not matched: <2> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-6829) Duplicate metric warnings clutter log
[ https://issues.apache.org/jira/browse/BEAM-6829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned BEAM-6829: Assignee: Maximilian Michels > Duplicate metric warnings clutter log > - > > Key: BEAM-6829 > URL: https://issues.apache.org/jira/browse/BEAM-6829 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.11.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability > > Logs fill up quickly with these warnings: > {code:java} > WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already > contains a Metric with the name ...{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7322) PubSubIO watermark does not advance for very low volumes
[ https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840472#comment-16840472 ] Tim Sell commented on BEAM-7322: In my example code, I have a job that produces events per pane with their lag, reading from different pubsub streams publishing at different frequencies. output attached: [^data.json] > PubSubIO watermark does not advance for very low volumes > > > Key: BEAM-7322 > URL: https://issues.apache.org/jira/browse/BEAM-7322 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Tim Sell >Priority: Minor > Attachments: data.json > > > I have identified an issue where the watermark does not advance when using > the beam PubSubIO when volumes are very low. > I have created a mini example project to demonstrate the behaviour with a > python script for generating messages at different frequencies: > https://github.com/tims/beam/tree/master/pubsub-watermark > [note: this is in a directory of a Beam fork for corp hoop jumping > convenience on my end, it is not intended for merging]. > The behaviour is easily replicated if you apply a fixed window triggering > after the watermark passes the end of the window. > pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) > .apply(ParDo.of(new ParseScoreEventFn())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > .apply(MapElements.into(kvs(strings(), integers())) > .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), > scoreEvent.getScore( > .apply(Count.perKey()) > .apply(ParDo.of(Log.of("counted per key"))); > With this triggering, using both the flink local runner the direct runner, > panes will be fired after a long delay (minutes) for low frequencies of > messages in pubsub (seconds). The biggest issue is that it seems no panes > will ever be emitted if you just send a few events and stop. This is > particularly likely trip up people new to Beam. > If I change the triggering to have early firings I get exactly the emitted > panes that you would expect. > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() > .alignedTo(Duration.standardSeconds(60 > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > I can use any variation of early firing triggers and they work as expected. > We believe that the watermark is not advancing when the volume is too low > because of the sampling that PubSubIO does to determine it's watermark. It > just never has a large enough sample. > This problem occurs in the direct runner and flink runner, but not in the > dataflow runner (because dataflow uses it's own PubSubIO because dataflow has > access to internal details of pubsub and so doesn't need to do any sampling). > For extra context from the user@ list: > *Kenneth Knowles:* > Thanks to your info, I think it is the configuration of MovingFunction [1] > that is the likely culprit, but I don't totally understand why. It is > configured like so: > - store 60 seconds of data > - update data every 5 seconds > - require at least 10 messages to be 'significant' > - require messages from at least 2 distinct 5 second update periods to > 'significant' > I would expect a rate of 1 message per second to satisfy this. I may have > read something wrong. > Have you filed an issue in Jira [2]? > Kenn > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 > [2] https://issues.apache.org/jira/projects/BEAM/issues > *Alexey Romanenko:* > Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-7322) PubSubIO watermark does not advance for very low volumes
[ https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tim Sell updated BEAM-7322: --- Attachment: data.json > PubSubIO watermark does not advance for very low volumes > > > Key: BEAM-7322 > URL: https://issues.apache.org/jira/browse/BEAM-7322 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Tim Sell >Priority: Minor > Attachments: data.json > > > I have identified an issue where the watermark does not advance when using > the beam PubSubIO when volumes are very low. > I have created a mini example project to demonstrate the behaviour with a > python script for generating messages at different frequencies: > https://github.com/tims/beam/tree/master/pubsub-watermark > [note: this is in a directory of a Beam fork for corp hoop jumping > convenience on my end, it is not intended for merging]. > The behaviour is easily replicated if you apply a fixed window triggering > after the watermark passes the end of the window. > pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) > .apply(ParDo.of(new ParseScoreEventFn())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > .apply(MapElements.into(kvs(strings(), integers())) > .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), > scoreEvent.getScore( > .apply(Count.perKey()) > .apply(ParDo.of(Log.of("counted per key"))); > With this triggering, using both the flink local runner the direct runner, > panes will be fired after a long delay (minutes) for low frequencies of > messages in pubsub (seconds). The biggest issue is that it seems no panes > will ever be emitted if you just send a few events and stop. This is > particularly likely trip up people new to Beam. > If I change the triggering to have early firings I get exactly the emitted > panes that you would expect. > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() > .alignedTo(Duration.standardSeconds(60 > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > I can use any variation of early firing triggers and they work as expected. > We believe that the watermark is not advancing when the volume is too low > because of the sampling that PubSubIO does to determine it's watermark. It > just never has a large enough sample. > This problem occurs in the direct runner and flink runner, but not in the > dataflow runner (because dataflow uses it's own PubSubIO because dataflow has > access to internal details of pubsub and so doesn't need to do any sampling). > For extra context from the user@ list: > *Kenneth Knowles:* > Thanks to your info, I think it is the configuration of MovingFunction [1] > that is the likely culprit, but I don't totally understand why. It is > configured like so: > - store 60 seconds of data > - update data every 5 seconds > - require at least 10 messages to be 'significant' > - require messages from at least 2 distinct 5 second update periods to > 'significant' > I would expect a rate of 1 message per second to satisfy this. I may have > read something wrong. > Have you filed an issue in Jira [2]? > Kenn > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 > [2] https://issues.apache.org/jira/projects/BEAM/issues > *Alexey Romanenko:* > Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 -- This message was sent by Atlassian JIRA (v7.6.3#76005)