[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
ZijieSong946 commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r466175841 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ## @@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { // TODO: Doing micro to mills truncation, need to throw exception. ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false); break; + case TYPE_DATETIME: +// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type +// because later it will be unparsed to the string representation of timestamp (e.g. "SELECT +// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25 +// 15:30:00:00'"). So we create a wrapper function here such that we can later recognize +// it and customize its unparsing in BeamBigQuerySqlDialect. +ret = +rexBuilder() +.makeCall( +SqlOperators.createZetaSqlFunction( +BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION, Review comment: It's wired that the override `toString()` has never been called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK
saavannanavati commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r466168597 ## File path: sdks/python/apache_beam/runners/worker/opcounters.py ## @@ -202,6 +209,37 @@ def __init__( self._sample_counter = 0 self._next_sample = 0 +self.producer_type_hints = None +self.producer_full_label = None +self.producer_parameter_name = None + +if producer and hasattr(producer, 'spec') and hasattr(producer.spec, + 'serialized_fn'): + fns = pickler.loads(producer.spec.serialized_fn) + if fns: +if hasattr(fns[0], '_runtime_type_hints'): + self.producer_type_hints = fns[0]._runtime_type_hints +if hasattr(fns[0], '_full_label'): + self.producer_full_label = fns[0]._full_label +if hasattr(fns[0], '_runtime_parameter_name'): + self.producer_parameter_name = fns[0]._runtime_parameter_name + +self.consumer_type_hints = [] +self.consumer_full_labels = [] +self.consumer_parameter_names = [] + +if consumers: Review comment: I moved as much logic as possible to the visitor (and removed the duplicated code) but kept the producer/consumer distinction because that determines whether we type check using the output types (for the producer) versus the input types (for the consumer) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK
saavannanavati commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r466166035 ## File path: sdks/python/apache_beam/runners/worker/operations.py ## @@ -238,6 +247,7 @@ def __init__(self, self.execution_context = None # type: Optional[ExecutionContext] self.consumers = collections.defaultdict( list) # type: DefaultDict[int, List[Operation]] +self.producer = None Review comment: Ah thanks, I'm not actually updating the value in the child class, `DoOperation`, so let me get rid of that and directly pass `None` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466109042 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): Review comment: I think maybe a test of pub/sub is necessary here because our connector has no direct dependency or invocation of pub/sub. The 'pubsub to search metadata' converter was tested in the unit test while read and write operations can be tested in an easier way here rather than using pub/sub. What's more, testing a streaming pipeline is complicated and needs much more work :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK
saavannanavati commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r466145176 ## File path: sdks/python/apache_beam/runners/common.py ## @@ -1340,6 +1342,17 @@ def process_outputs( self.per_element_output_counter.add_input(0) return +if isinstance(results, (dict, str, unicode, bytes)): + results_type = type(results).__name__ + raise TypeCheckError( + 'Returning a %s from a ParDo or FlatMap is ' + 'discouraged. Please use list("%s") if you really ' + 'want this behavior.' % (results_type, results)) +elif not isinstance(results, collections.Iterable): Review comment: Why is this check slow? Also, if we remove it, how should we give the user the relevant error message? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas commented on a change in pull request #12448: [BEAM-9679] Add Additional Parameters lesson to Go SDK Katas
damondouglas commented on a change in pull request #12448: URL: https://github.com/apache/beam/pull/12448#discussion_r466143420 ## File path: learning/katas/go/core_transforms/windowing/additional_parameters/task.md ## @@ -0,0 +1,77 @@ + + +# Windowing + +This lesson introduces the concept of windowed PCollection elements. A window is a view into a fixed beginning and +fixed end to a set of data. In the beam model, windowing subdivides a PCollection according to the +timestamps of its individual elements. + +Beam can pass information about the window and timestamp to your elements in your DoFn. All your previous +lessons' DoFn had this information available, yet you never made use of it in your DoFn parameters. In this +lesson you will. The simple toy dataset has five git commit messages and their timestamps Review comment: I like the idea of more windowing lessons after the rest of the originally planned katas are complete. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on a change in pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.
youngoli commented on a change in pull request #12471: URL: https://github.com/apache/beam/pull/12471#discussion_r466120138 ## File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go ## @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package schema contains utility functions for relating Go types and Beam Schemas. +// +// Not all Go types can be converted to schemas. This is Go is more expressive than +// Beam schemas. Just as not all Go types can be serialized, similarly, +// not all Beam Schemas will have a conversion to Go types, until the correct +// mechanism exists in the SDK to handle them. +// +// While efforts will be made to have conversions be reversable, this will not +// be possible in all instances. Eg. Go arrays as fields will be converted to +// Beam Arrays, but a Beam Array type will map by default to a Go slice. +package schema + +import ( + "fmt" + "reflect" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" +) + +// FromType returns a Beam Schema of the passed in type. +// Returns an error if the type cannot be converted to a Schema. +func FromType(ot reflect.Type) (*pipepb.Schema, error) { + t := ot // keep the original type for errors. + // The top level schema for a pointer to struct and the struct is the same. + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + if t.Kind() != reflect.Struct { + return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot) + } + return structToSchema(t), nil +} + +func structToSchema(t reflect.Type) *pipepb.Schema { + fields := make([]*pipepb.Field, 0, t.NumField()) + for i := 0; i < t.NumField(); i++ { + fields = append(fields, structFieldToField(t.Field(i))) + } + return &pipepb.Schema{ + Fields: fields, + } +} + +func structFieldToField(sf reflect.StructField) *pipepb.Field { + name := sf.Name + if tag := sf.Tag.Get("beam"); tag != "" { + name, _ = parseTag(tag) + } + ftype := reflectTypeToFieldType(sf.Type) + + return &pipepb.Field{ + Name: name, + Type: ftype, + } +} + +func reflectTypeToFieldType(ot reflect.Type) *pipepb.FieldType { + var isPtr bool + t := ot + if t.Kind() == reflect.Ptr { + isPtr = true + t = t.Elem() + } + switch t.Kind() { + case reflect.Map: + kt := reflectTypeToFieldType(t.Key()) + vt := reflectTypeToFieldType(t.Elem()) + return &pipepb.FieldType{ + Nullable: isPtr, + TypeInfo: &pipepb.FieldType_MapType{ + MapType: &pipepb.MapType{ + KeyType: kt, + ValueType: vt, + }, + }, + } + case reflect.Struct: + sch := structToSchema(t) + return &pipepb.FieldType{ + Nullable: isPtr, + TypeInfo: &pipepb.FieldType_RowType{ + RowType: &pipepb.RowType{ + Schema: sch, + }, + }, + } + case reflect.Slice, reflect.Array: + // Special handling for []byte + if t == reflectx.ByteSlice { + return &pipepb.FieldType{ + Nullable: isPtr, + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_BYTES, + }, + } + } + vt := reflectTypeToFieldType(t.Elem()) + return &pipepb.FieldType{ + Nullable: isPtr, +
[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
robinyqiu commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r466122482 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ## @@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { // TODO: Doing micro to mills truncation, need to throw exception. ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false); break; + case TYPE_DATETIME: +// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type +// because later it will be unparsed to the string representation of timestamp (e.g. "SELECT +// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25 +// 15:30:00:00'"). So we create a wrapper function here such that we can later recognize +// it and customize its unparsing in BeamBigQuerySqlDialect. +ret = +rexBuilder() +.makeCall( +SqlOperators.createZetaSqlFunction( +BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION, Review comment: To make it clearer, you can get the `TimestampString` value from the `RexLiteral` like https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java#L74, and then use it to create your own `SqlDateTimeLiteral`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
robinyqiu commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r466119772 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ## @@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { // TODO: Doing micro to mills truncation, need to throw exception. ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false); break; + case TYPE_DATETIME: +// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type +// because later it will be unparsed to the string representation of timestamp (e.g. "SELECT +// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25 +// 15:30:00:00'"). So we create a wrapper function here such that we can later recognize +// it and customize its unparsing in BeamBigQuerySqlDialect. +ret = +rexBuilder() +.makeCall( +SqlOperators.createZetaSqlFunction( +BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION, Review comment: We don't have to extend `SqlTimestampLiteral`. You can extend `SqlLiteral` directly, like in https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java#L95 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL
ZijieSong946 commented on a change in pull request #12348: URL: https://github.com/apache/beam/pull/12348#discussion_r466111720 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java ## @@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { // TODO: Doing micro to mills truncation, need to throw exception. ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false); break; + case TYPE_DATETIME: +// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for ZetaSQL DATETIME type +// because later it will be unparsed to the string representation of timestamp (e.g. "SELECT +// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT TIMESTAMP '2008-12-25 +// 15:30:00:00'"). So we create a wrapper function here such that we can later recognize +// it and customize its unparsing in BeamBigQuerySqlDialect. +ret = +rexBuilder() +.makeCall( +SqlOperators.createZetaSqlFunction( +BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION, Review comment: We could not create a class extending `SqlTimeStampLiteral` because its constructor is private. https://github.com/apache/calcite/blob/2088488ac8327b19512a76a122cae2961fc551c3/core/src/main/java/org/apache/calcite/sql/SqlTimestampLiteral.java#L34 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466109325 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): + expected_output_metadata = None + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') + +# create a temp Dicom store based on the time stamp +self.temp_dicom_store = "DICOM_store_" + str(time.time()) Review comment: Changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466109042 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): Review comment: I don't think a test of pub/sub is necessary here because our connector has no direct dependency or invocation of pub/sub. The 'pubsub to search metadata' converter was tested in the unit test while read and write operations can be tested in an easier way here rather than using pub/sub. What's more, testing a streaming pipeline is complicated and needs much more work :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12455: [BEAM-10630] Include data from load tests in the release process
aaltay commented on pull request #12455: URL: https://github.com/apache/beam/pull/12455#issuecomment-669645454 LGTM. Thank you @mxm. ( I do not know if there were more comments on the dev@ list thread, this LGTM as it is.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466104978 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): + expected_output_metadata = None + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') + +# create a temp Dicom store based on the time stamp +self.temp_dicom_store = "DICOM_store_" + str(time.time()) +create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) +client = storage.Client() +bucket = client.get_bucket('temp-storage-for-dicom-io-tests') +blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json') +data = json.loads(blob.download_as_string()) +self.expected_output_metadata = data + + def tearDown(self): +# clean up the temp Dicom store +delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) + + @attr('IT') + def test_dicom_search(self): +# Search and compare the metadata of a persistent DICOM store. +input_dict = {} +input_dict['project_id'] = self.project +input_dict['region'] = REGION +input_dict['dataset_id'] = DATA_SET_ID +input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME +input_dict['search_type'] = "instances" + +expected_dict = {} +expected_dict['result'] = self.expected_output_metadata +expected_dict['status'] = 200 +expected_dict['input'] = input_dict +expected_dict['success'] = True + +with TestPipeline() as p: + results = (p | beam.Create([input_dict]) | DicomSearch()) + assert_that(results, equal_to([expected_dict])) + + @attr('IT') + def test_dicom_store_instance(self): +# Store DICOM files to a empty DICOM store from a GCS bucket, +# then check if the store metadata match. +input_dict = {} +input_dict['project_id'] = self.project +input_dict['region'] = REGION +input_dict['dataset_id'] = DATA_SET_ID +input_dict['dico
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466104754 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): + expected_output_metadata = None + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') + +# create a temp Dicom store based on the time stamp +self.temp_dicom_store = "DICOM_store_" + str(time.time()) +create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) +client = storage.Client() +bucket = client.get_bucket('temp-storage-for-dicom-io-tests') +blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json') +data = json.loads(blob.download_as_string()) +self.expected_output_metadata = data + + def tearDown(self): +# clean up the temp Dicom store +delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) + + @attr('IT') + def test_dicom_search(self): +# Search and compare the metadata of a persistent DICOM store. +input_dict = {} +input_dict['project_id'] = self.project +input_dict['region'] = REGION +input_dict['dataset_id'] = DATA_SET_ID +input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME +input_dict['search_type'] = "instances" + +expected_dict = {} +expected_dict['result'] = self.expected_output_metadata +expected_dict['status'] = 200 +expected_dict['input'] = input_dict +expected_dict['success'] = True + +with TestPipeline() as p: + results = (p | beam.Create([input_dict]) | DicomSearch()) + assert_that(results, equal_to([expected_dict])) + + @attr('IT') + def test_dicom_store_instance(self): +# Store DICOM files to a empty DICOM store from a GCS bucket, +# then check if the store metadata match. +input_dict = {} +input_dict['project_id'] = self.project +input_dict['region'] = REGION +input_dict['dataset_id'] = DATA_SET_ID +input_dict['dico
[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
George-Wu commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r466104661 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): + expected_output_metadata = None + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') + +# create a temp Dicom store based on the time stamp +self.temp_dicom_store = "DICOM_store_" + str(time.time()) +create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) +client = storage.Client() +bucket = client.get_bucket('temp-storage-for-dicom-io-tests') +blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json') +data = json.loads(blob.download_as_string()) +self.expected_output_metadata = data + + def tearDown(self): +# clean up the temp Dicom store +delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store) + + @attr('IT') + def test_dicom_search(self): Review comment: test case added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #12477: [BEAM-10646] Simplify SparkPortableExecutionTest.
ibzib opened a new pull request #12477: URL: https://github.com/apache/beam/pull/12477 - Remove SparkPortableExecutionTest.testExecution. … - Everything SparkPortableExecutionTest.testExecution tests is already covered by validates runner tests. - Don't wait for test to time out if pipeline fails. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam
[GitHub] [beam] youngoli commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
youngoli commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r466056636 ## File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go ## @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// xlang_wordcount exemplifies using a cross language transform from Python to count words +package main Review comment: This example requires running an expansion service separately in order to work, right? I'd add instructions to the package comment on how to run that so people can run this example without existing knowledge of how xlang works. See the stringsplit example for an example of this. It requires running on a job service that supports splitting, so I included instructions for running an external job service. ## File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go ## @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// xlang_wordcount exemplifies using a cross language transform from Python to count words +package main + +import ( + "context" + "flag" + "fmt" + "log" + "regexp" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" + + // Imports to enable correct filesystem access and runner setup in LOOPBACK mode + _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs" + _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal" +) + +var ( + // Set this option to choose a different input file or glob. + input = flag.String("input", "./input", "File(s) to read.") + + // Set this required option to specify where to write the output. + output = flag.String("output", "./output", "Output file (required).") +) + +var ( + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + empty = beam.NewCounter("extract", "emptyLines") + lineLen = beam.NewDistribution("extract", "lineLenDistro") +) + +// extractFn is a DoFn that emits the words in a given line. +func extractFn(ctx context.Context, line string, emit func(string)) { + lineLen.Update(ctx, int64(len(line))) + if len(strings.TrimSpace(line)) == 0 { + empty.Inc(ctx, 1) + } + for _, word := range wordRE.FindAllString(line, -1) { + emit(word) + } +} + +// formatFn is a DoFn that formats a word and its count as a string. +func formatFn(w string, c int64) string { + return fmt.Sprintf("%s: %v", w, c) +} + +func init() { + beam.RegisterFunction(extractFn) + beam.RegisterFunction(formatFn) +} + +func main() { + flag.Parse() + beam.Init() + + if *output == "" { + log.Fatal("No output provided") + } + + p := beam.NewPipeline() + s := p.Root() + + lines := textio.Read(s, *input) + col := beam.ParDo(s, extractFn, lines) + + // Using Cross-language Count from Python's test expansion service + // TODO(pskevin): Cleaner using-face API + outputType := typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.Int64)) + external := &beam.ExternalT
[GitHub] [beam] TheNeuralBit merged pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit merged pull request #12426: URL: https://github.com/apache/beam/pull/12426 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12448: [BEAM-9679] Add Additional Parameters lesson to Go SDK Katas
lostluck commented on a change in pull request #12448: URL: https://github.com/apache/beam/pull/12448#discussion_r466074785 ## File path: learning/katas/go/core_transforms/windowing/additional_parameters/task.md ## @@ -0,0 +1,77 @@ + + +# Windowing + +This lesson introduces the concept of windowed PCollection elements. A window is a view into a fixed beginning and +fixed end to a set of data. In the beam model, windowing subdivides a PCollection according to the +timestamps of its individual elements. + +Beam can pass information about the window and timestamp to your elements in your DoFn. All your previous +lessons' DoFn had this information available, yet you never made use of it in your DoFn parameters. In this +lesson you will. The simple toy dataset has five git commit messages and their timestamps Review comment: Beam's not passing information to the elements, it's passing it to the dofn. Consider... `A DoFn can request timestamp and windowing information about the element it's processing. All your previous lessons had this information available as well. In this lesson you will make use of these parameters.` It's probably a personal nit of mine, but the "yet you" sounds accusatory (not saying that was your intent). ## File path: learning/katas/go/core_transforms/windowing/additional_parameters/task.md ## @@ -0,0 +1,77 @@ + + +# Windowing + +This lesson introduces the concept of windowed PCollection elements. A window is a view into a fixed beginning and +fixed end to a set of data. In the beam model, windowing subdivides a PCollection according to the +timestamps of its individual elements. Review comment: I don't know how precise we need to be in the kata's, but consider adding. "An element can be a part of one or more windows." ## File path: learning/katas/go/core_transforms/windowing/additional_parameters/task.md ## @@ -0,0 +1,77 @@ + + +# Windowing + +This lesson introduces the concept of windowed PCollection elements. A window is a view into a fixed beginning and +fixed end to a set of data. In the beam model, windowing subdivides a PCollection according to the +timestamps of its individual elements. + +Beam can pass information about the window and timestamp to your elements in your DoFn. All your previous +lessons' DoFn had this information available, yet you never made use of it in your DoFn parameters. In this +lesson you will. The simple toy dataset has five git commit messages and their timestamps Review comment: I'd remove either `simple` or `toy`, they're redundant together. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on pull request #12426: URL: https://github.com/apache/beam/pull/12426#issuecomment-669604112 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #12474: OrderedListState API
reuvenlax commented on pull request #12474: URL: https://github.com/apache/beam/pull/12474#issuecomment-669603386 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #12476: [BEAM-10645] Create context for allowing non-parallel dataframe operations.
robertwb opened a new pull request #12476: URL: https://github.com/apache/beam/pull/12476 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuil
[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder
TheNeuralBit commented on pull request #12426: URL: https://github.com/apache/beam/pull/12426#issuecomment-669588208 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #12223: [Beam-4379] Make ParquetIO read splittable
boyuanzz commented on a change in pull request #12223: URL: https://github.com/apache/beam/pull/12223#discussion_r466020446 ## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java ## @@ -154,10 +187,15 @@ public static ReadFiles readFiles(Schema schema) { abstract @Nullable GenericData getAvroDataModel(); +abstract boolean getSplit(); Review comment: `isSplittable`? ## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java ## @@ -187,12 +229,18 @@ public Read withAvroDataModel(GenericData model) { @Override public PCollection expand(PBegin input) { checkNotNull(getFilepattern(), "Filepattern cannot be null."); - - return input - .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) - .apply(FileIO.matchAll()) - .apply(FileIO.readMatches()) - .apply(readFiles(getSchema()).withAvroDataModel(getAvroDataModel())); + PCollection inputFiles = + input + .apply( + "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()); + if (!getSplit()) { Review comment: Should it be `if (getSplit())` ? ## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java ## @@ -187,12 +229,18 @@ public Read withAvroDataModel(GenericData model) { @Override public PCollection expand(PBegin input) { checkNotNull(getFilepattern(), "Filepattern cannot be null."); - - return input - .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) - .apply(FileIO.matchAll()) - .apply(FileIO.readMatches()) - .apply(readFiles(getSchema()).withAvroDataModel(getAvroDataModel())); + PCollection inputFiles = + input + .apply( + "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()); + if (!getSplit()) { +return inputFiles.apply( + readFiles(getSchema()).withSplit().withAvroDataModel(getAvroDataModel())); + } else { Review comment: We can drop this redundant `else`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r466022189 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +209,102 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link + * KafkaSourceDescriptor} as input and outputs a PCollection of {@link KafkaRecord}. The core + * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code + * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadSourceDescriptors#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadSourceDescriptors#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadSourceDescriptors#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadSourceDescriptors#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadSourceDescriptors#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadSourceDescriptors#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * {@link ReadSourceDescriptors#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * {@link ReadSourceDescriptors#isCommitOffsetEnabled()} has the same meaning as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * + * + * For example, to create a basic {@link ReadSourceDescriptors} transform: + * + * {@code + * pipeline + * .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * } + * + * Note that the {@code bootstrapServers} can also be populated from the {@link + * KafkaSourceDescriptor}: + * + * {@code + * pipeline + * .apply(Create.of( + *KafkaSourceDescriptor.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * } + * + * Configurations of {@link ReadSourceDescriptors} + * + * Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * {@link ReadSourceDescriptors#commitOffsets()} enables committing offset after processing the + * record. Note that if the {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadSourceDescriptors#commitOffsets()} will be ignored. + * + * {@link ReadSourceDescriptors#withExtractOutputTimestampFn(SerializableFunction)} is used to + * compute the {@code output timestamp} for a given {@link KafkaRecord}. There are three built-in + * types: {@link ReadSourceDescriptors#withProcessingTime()}, {@link + * ReadSourceDescriptors#withCreateTime()} and {@link ReadSourceDescriptors#withLogAppendTime()}. Review comment: use an unorderered list for the options: `` ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +209,102 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link Review comment: We should state the order of preference for how we start reading (offset/timestamp/last commit offset) in this block somewhere. ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + *
[GitHub] [beam] codeBehindMe commented on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns
codeBehindMe commented on pull request #12170: URL: https://github.com/apache/beam/pull/12170#issuecomment-669553446 thank @lostluck , I will implement this today / tomorrow. Apologies, personal commitments have been getting in the way recently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] codeBehindMe edited a comment on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns
codeBehindMe edited a comment on pull request #12170: URL: https://github.com/apache/beam/pull/12170#issuecomment-669553446 thanks @lostluck , I will implement this today / tomorrow. Apologies, personal commitments have been getting in the way recently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #12475: Update nexmark dashboard links.
apilloud commented on a change in pull request #12475: URL: https://github.com/apache/beam/pull/12475#discussion_r466019162 ## File path: website/www/site/content/en/documentation/sdks/java/testing/nexmark.md ## @@ -632,30 +632,10 @@ There are dashboards for these runners (others to come): - spark - flink - direct runner +- Dataflow Each dashboard contains: - graphs in batch mode - graphs in streaming mode - graphs for all the queries. -### Performance dashboards links - -[Nexmark performance direct runner](https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424) Review comment: The equivalent dashboards don't exist yet. I filed https://issues.apache.org/jira/browse/BEAM-10643 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable
tvalentyn commented on a change in pull request #12239: URL: https://github.com/apache/beam/pull/12239#discussion_r466016946 ## File path: sdks/python/test-suites/dataflow/build.gradle ## @@ -0,0 +1,50 @@ +/* Review comment: It seems that we need to make changes to the top-level suite definitions, e.g. : https://github.com/apache/beam/blob/ba380270dbc8854ad44f6636a6767317a0416a99/build.gradle#L216 to point to /test-suites//build.gradle files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on a change in pull request #12475: Update nexmark dashboard links.
aaltay commented on a change in pull request #12475: URL: https://github.com/apache/beam/pull/12475#discussion_r466015852 ## File path: website/www/site/content/en/documentation/sdks/java/testing/nexmark.md ## @@ -632,30 +632,10 @@ There are dashboards for these runners (others to come): - spark - flink - direct runner +- Dataflow Each dashboard contains: - graphs in batch mode - graphs in streaming mode - graphs for all the queries. -### Performance dashboards links - -[Nexmark performance direct runner](https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424) Review comment: Can we deep link similarly in the new dashboards? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable
tvalentyn commented on a change in pull request #12239: URL: https://github.com/apache/beam/pull/12239#discussion_r466015279 ## File path: sdks/python/test-suites/dataflow/build.gradle ## @@ -0,0 +1,50 @@ +/* Review comment: Nevermind, I see that they both serve a different purpose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable
tvalentyn commented on a change in pull request #12239: URL: https://github.com/apache/beam/pull/12239#discussion_r466012301 ## File path: sdks/python/test-suites/dataflow/build.gradle ## @@ -0,0 +1,50 @@ +/* Review comment: why do we need build.gradle in addition to common.grade? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh opened a new pull request #12475: Update nexmark dashboard links.
tysonjh opened a new pull request #12475: URL: https://github.com/apache/beam/pull/12475 The App Engine app that hosted the old perf benchmarks was turned down. The new dashboards are available at http://metrics.beam.apache.org. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.or
[GitHub] [beam] DanKotowski commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test
DanKotowski commented on a change in pull request #12473: URL: https://github.com/apache/beam/pull/12473#discussion_r465976291 ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_beam.io.gcp.dicomio import UploadToDicomStore +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +REGION = 'us-central1' +DATA_SET_ID = 'apache-beam-integration-testing' +HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1' +PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store" +DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files" +NUM_INSTANCE = 18 + + +def create_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Create a an empty DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id) + + response = session.post( + dicomweb_path, params={"dicomStoreId": dicom_store_id}) + response.raise_for_status() + return response.status_code + + +def delete_dicom_store(project_id, dataset_id, region, dicom_store_id): + # Delete an existing DICOM store + credential, _ = default() + session = requests.AuthorizedSession(credential) + api_endpoint = "{}/projects/{}/locations/{}".format( + HEALTHCARE_BASE_URL, project_id, region) + + # base of dicomweb path. + dicomweb_path = "{}/datasets/{}/dicomStores/{}".format( + api_endpoint, dataset_id, dicom_store_id) + + response = session.delete(dicomweb_path) + response.raise_for_status() + return response.status_code + + +class DICOMIoIntegrationTest(unittest.TestCase): + expected_output_metadata = None + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') + +# create a temp Dicom store based on the time stamp +self.temp_dicom_store = "DICOM_store_" + str(time.time()) Review comment: If tests are run in parallel potentially the 2 stores could have the same name because you are using time. It may be best to generate a random string. ## File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py ## @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Google Cloud DICOM IO connector. +""" +# pytype: skip-file + +from __future__ import absolute_import + +import json +import time +import unittest + +from google.auth import default +from google.auth.transport import requests +from google.cloud import storage +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.gcp.dicomio import DicomSearch +from apache_be
[GitHub] [beam] reuvenlax opened a new pull request #12474: OrderedListState API
reuvenlax opened a new pull request #12474: URL: https://github.com/apache/beam/pull/12474 This PR adds the API and and in-memory implementation for the timestamp-ordered list state. The API is currently marked experimental and is still subject to change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli closed pull request #12046: [BEAM-10290] Add a unit(ish) test for dynamic splitting correctness.
youngoli closed pull request #12046: URL: https://github.com/apache/beam/pull/12046 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #12433: [BEAM-10613] Implement BIT_AND for Beam SQL ZetaSQL dialect as CombineFn
amaliujia commented on pull request #12433: URL: https://github.com/apache/beam/pull/12433#issuecomment-669466436 My impression was bit_and crashed due to this special while others didn't (or not tested). That was why bit_and was disabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia edited a comment on pull request #12433: [BEAM-10613] Implement BIT_AND for Beam SQL ZetaSQL dialect as CombineFn
amaliujia edited a comment on pull request #12433: URL: https://github.com/apache/beam/pull/12433#issuecomment-669466436 My impression was bit_and crashed due to this special case while others didn't (or not tested). That was why bit_and was disabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12046: [BEAM-10290] Add a unit(ish) test for dynamic splitting correctness.
lostluck commented on pull request #12046: URL: https://github.com/apache/beam/pull/12046#issuecomment-669466490 Since the other PR was merged in, this can be closed, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns
lostluck commented on pull request #12170: URL: https://github.com/apache/beam/pull/12170#issuecomment-669466247 This looks good to me! Thank you for the contribution! Would you mind adding a test to validate this behaviour? exec/util_test.go doesn't currently exist, so it would need to be added. There's no need to test everything in exec/util.go, just callNoPanic. You can test three cases: * one where the passed in func(context.Context) error doesn't panic, and the error is just returned as is (they should compare as equal for a simple error) * another test where the the passed in function panics, which should return an error that contains "panic: " in it. * finally, one where the error passed to panic is wrapped in your new doFnError error type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12455: [BEAM-10630] Include data from load tests in the release process
mxm commented on a change in pull request #12455: URL: https://github.com/apache/beam/pull/12455#discussion_r465968286 ## File path: website/www/site/content/en/contribute/release-guide.md ## @@ -244,7 +247,21 @@ __Attention__: Only PMC has permission to perform this. If you are not a PMC, pl ** -## 2. Create a release branch in apache/beam repository +## 3. Investigate performance regressions + +Check the Beam load tests for possible performance regressions. Measurements are available on [metrics.beam.apache.org](http://metrics.beam.apache.org). + +All Runners which publish data should be checked for the following, in both *batch* and *streaming* mode: + +- [ParDo](http://metrics.beam.apache.org/d/MOi-kf3Zk/pardo-load-tests) and [GBK](http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-test): Runtime, latency, checkpoint duration +- [Nexmark](http://metrics.beam.apache.org/d/ahuaA_zGz/nexmark): Query runtime for all queries +- [IO](http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow): Runtime + +If regressions are found, the release branch can still be created, but the regressions should be investigated and fixed as part of the release process. +JIRA issues should be created for each regression with the 'Fix Version' set to the to-be-released version. +Next, the mailing list should be informed to allow fixing the regressions in the course of the release. Review comment: I've pushed two more commits to address these two points. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12455: [BEAM-10630] Include data from load tests in the release process
mxm commented on a change in pull request #12455: URL: https://github.com/apache/beam/pull/12455#discussion_r465959895 ## File path: website/www/site/content/en/contribute/release-guide.md ## @@ -244,7 +247,21 @@ __Attention__: Only PMC has permission to perform this. If you are not a PMC, pl ** -## 2. Create a release branch in apache/beam repository +## 3. Investigate performance regressions + +Check the Beam load tests for possible performance regressions. Measurements are available on [metrics.beam.apache.org](http://metrics.beam.apache.org). + +All Runners which publish data should be checked for the following, in both *batch* and *streaming* mode: + +- [ParDo](http://metrics.beam.apache.org/d/MOi-kf3Zk/pardo-load-tests) and [GBK](http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-test): Runtime, latency, checkpoint duration +- [Nexmark](http://metrics.beam.apache.org/d/ahuaA_zGz/nexmark): Query runtime for all queries +- [IO](http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow): Runtime + +If regressions are found, the release branch can still be created, but the regressions should be investigated and fixed as part of the release process. +JIRA issues should be created for each regression with the 'Fix Version' set to the to-be-released version. Review comment: I'd like to try filing regression JIRA issues for all listed benchmarks. Hopefully, that should not be too many. Like you said, the issues will be triaged. If there is nobody taking care of an issue, then ultimately we can think about removing the associated benchmark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] George-Wu opened a new pull request #12473: [BEAM-10642] DICOM API Beam IO connector e2e test
George-Wu opened a new pull request #12473: URL: https://github.com/apache/beam/pull/12473 Add integration test for DICOM Beam IO connector R:@pabloem, CC:@dranderson1117,@DanKotowski Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_ Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apach
[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
pskevin commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r465934906 ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,144 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { + idint + Urn string + Payload []byte + In[]PCollection + Out []FullType + Bounded bool + ExpansionAddr string + Components*pipepb.Components + ExpandedTransform *pipepb.PTransform + Requirements []string Review comment: Awesome suggestion. Will work on it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
pskevin commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r465934590 ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,144 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { Review comment: The current version was never meant to be representative of what the final API would be. Only after discussing code organization was the API surface meant to be refined. Thanks for the new type idea! It'll make separation of concerns much clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #12459: [BEAM-9547] Simplify pandas implementation.
robertwb merged pull request #12459: URL: https://github.com/apache/beam/pull/12459 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on pull request #12185: URL: https://github.com/apache/beam/pull/12185#issuecomment-669367028 Removed sibling deduplication. @robertwb and @aaltay could you take another look? Also, filed [BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641) for adding this to Dataflow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12472: [BEAM-7390] Add sum code snippets
aaltay commented on pull request #12472: URL: https://github.com/apache/beam/pull/12472#issuecomment-669363089 LGTM. Please ping me after @rosetn reviews. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12453: [BEAM-9421] Add Java snippets to NLP documentation.
aaltay commented on pull request #12453: URL: https://github.com/apache/beam/pull/12453#issuecomment-669359409 > @aaltay I think testing this would increase test time and introduce risk of flakes - the cloud language analysis results aren't very reliable (since the ML model underneath can change) - the most we could test would be if the pipeline is correct, running and returns anything. Snippet tests are not meant to be integration tests, but more like a way to check that snippets are still executable over time. Anything we can test without actually making a round trip to the service? If that is not possible, let's skip tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r465917488 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_siblings(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + """Runs common subexpression elimination for common siblings. + + If stages have common input, an identical transform, and one output each, + then all but one stages will be eliminated, and the output of the remaining + will be connected to the original output PCollections of the eliminated + stages. This elimination runs only once, not recursively, and will only + eliminate the first stage after a common input, rather than a chain of + stages. + """ + + SiblingKey = collections.namedtuple( + 'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id']) + + def get_sibling_key(transform): +"""Returns a key that will be identical for common siblings.""" +transform_output_keys = list(transform.outputs.keys()) +# Return None as the sibling key for ineligible transforms. +if len(transform_output_keys + ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn: + return None +return SiblingKey( +spec_urn=transform.spec.urn, +spec_payload=transform.spec.payload, +inputs=tuple(transform.inputs.items()), +environment_id=transform.environment_id) + + # Group stages by keys. + stages_by_sibling_key = collections.defaultdict(list) + for stage in stages: +transform = only_transform(stage.transforms) +stages_by_sibling_key[get_sibling_key(transform)].append(stage) + + # Eliminate stages and build the output PCollection remapping dictionary. + pcoll_id_remap = {} + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +if sibling_key is None or len(sibling_stages) == 1: + continue +output_pcoll_ids = [ +only_element(stage.transforms[0].outputs.values()) +for stage in sibling_stages +] +to_delete_pcoll_ids = output_pcoll_ids[1:] +for to_delete_pcoll_id in to_delete_pcoll_ids: + pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0] + del context.components.pcollections[to_delete_pcoll_id] +del sibling_stages[1:] + + # Yield stages while remapping output PCollections if needed. + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +for stage in sibling_stages: + input_keys_to_remap = [] + for input_key, input_pcoll_id in stage.transforms[0].inputs.items(): +if input_pcoll_id in pcoll_id_remap: + input_keys_to_remap.append(input_key) + for input_key_to_remap in input_keys_to_remap: +stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[ +stage.transforms[0].inputs[input_key_to_remap]] + yield stage + + +def pack_combiners(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterator[Stage] + """Packs sibling CombinePerKey stages into a single CombinePerKey. + + If CombinePerKey stages have a common input, one input each, and one output + each, pack the stages into a single stage that runs all CombinePerKeys and + outputs resulting tuples to a new PCollection. A subsequent stage unpacks + tuples from this PCollection and sends them to the original output + PCollections. + """ + + class _UnpackFn(core.DoFn): +"""A DoFn that unpacks a packed to multiple tagged outputs. + +Example: + tags = (T1, T2, ...) + input = (K, (V1, V2, ...)) + output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ... +""" + +def __init__(self, tags): + self._tags = tags + +def process(self, element): + key, values = element + return [ + core.pvalue.TaggedOutput(tag, (key, value)) + for tag, value in zip(self._tags, values) + ] + + def _get_fallback_coder_id(): +return context.add_or_get_coder_id( +coders.registry.get_coder(object).to_runner_api(None)) + + def _get_component_coder_id_from_kv_coder(coder, index): +assert index < 2 +if coder.spec.urn == common_urns.coders.KV.urn and len( +coder.component_coder_ids) == 2: + return coder.component_coder_ids[index] +return _get_fallback_coder_id() + + def _get_key_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 0) + + def _get_value_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 1) + + def _try_fuse_stages(a, b): +if a.can_fuse(b, context): + return a.fuse(b) +else: + raise ValueError + + def _try_merge_environments(env1, env2): +if env1 is None: + return env2 +elif env2 is None: + return env1 +else: + if env1 != env2: +
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r465915603 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_siblings(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + """Runs common subexpression elimination for common siblings. + + If stages have common input, an identical transform, and one output each, + then all but one stages will be eliminated, and the output of the remaining + will be connected to the original output PCollections of the eliminated + stages. This elimination runs only once, not recursively, and will only + eliminate the first stage after a common input, rather than a chain of + stages. + """ + + SiblingKey = collections.namedtuple( + 'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id']) + + def get_sibling_key(transform): +"""Returns a key that will be identical for common siblings.""" +transform_output_keys = list(transform.outputs.keys()) +# Return None as the sibling key for ineligible transforms. +if len(transform_output_keys + ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn: + return None +return SiblingKey( +spec_urn=transform.spec.urn, +spec_payload=transform.spec.payload, +inputs=tuple(transform.inputs.items()), +environment_id=transform.environment_id) + + # Group stages by keys. + stages_by_sibling_key = collections.defaultdict(list) + for stage in stages: +transform = only_transform(stage.transforms) +stages_by_sibling_key[get_sibling_key(transform)].append(stage) + + # Eliminate stages and build the output PCollection remapping dictionary. + pcoll_id_remap = {} + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +if sibling_key is None or len(sibling_stages) == 1: + continue +output_pcoll_ids = [ +only_element(stage.transforms[0].outputs.values()) +for stage in sibling_stages +] +to_delete_pcoll_ids = output_pcoll_ids[1:] +for to_delete_pcoll_id in to_delete_pcoll_ids: + pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0] + del context.components.pcollections[to_delete_pcoll_id] +del sibling_stages[1:] + + # Yield stages while remapping output PCollections if needed. + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +for stage in sibling_stages: + input_keys_to_remap = [] + for input_key, input_pcoll_id in stage.transforms[0].inputs.items(): +if input_pcoll_id in pcoll_id_remap: + input_keys_to_remap.append(input_key) + for input_key_to_remap in input_keys_to_remap: +stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[ +stage.transforms[0].inputs[input_key_to_remap]] + yield stage + + +def pack_combiners(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterator[Stage] + """Packs sibling CombinePerKey stages into a single CombinePerKey. + + If CombinePerKey stages have a common input, one input each, and one output + each, pack the stages into a single stage that runs all CombinePerKeys and + outputs resulting tuples to a new PCollection. A subsequent stage unpacks + tuples from this PCollection and sends them to the original output + PCollections. + """ + + class _UnpackFn(core.DoFn): +"""A DoFn that unpacks a packed to multiple tagged outputs. + +Example: + tags = (T1, T2, ...) + input = (K, (V1, V2, ...)) + output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ... +""" + +def __init__(self, tags): + self._tags = tags + +def process(self, element): + key, values = element + return [ + core.pvalue.TaggedOutput(tag, (key, value)) + for tag, value in zip(self._tags, values) + ] + + def _get_fallback_coder_id(): +return context.add_or_get_coder_id( +coders.registry.get_coder(object).to_runner_api(None)) + + def _get_component_coder_id_from_kv_coder(coder, index): +assert index < 2 +if coder.spec.urn == common_urns.coders.KV.urn and len( +coder.component_coder_ids) == 2: + return coder.component_coder_ids[index] +return _get_fallback_coder_id() + + def _get_key_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 0) + + def _get_value_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 1) + + def _try_fuse_stages(a, b): +if a.can_fuse(b, context): + return a.fuse(b) +else: + raise ValueError + + def _try_merge_environments(env1, env2): Review comment: Yup, replaced with `Stage._merge_environments` instead.
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r457799457 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -289,6 +289,8 @@ def create_stages( phases=[ translations.annotate_downstream_side_inputs, translations.fix_side_input_pcoll_coders, +translations.eliminate_common_siblings, Review comment: As we discussed in a separate conversation, we would probably need a design or RFC for annotating DoFns are idempotent. As such, I've removed `eliminate_common_siblings` from this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r465914422 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_siblings(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + """Runs common subexpression elimination for common siblings. + + If stages have common input, an identical transform, and one output each, + then all but one stages will be eliminated, and the output of the remaining + will be connected to the original output PCollections of the eliminated + stages. This elimination runs only once, not recursively, and will only + eliminate the first stage after a common input, rather than a chain of + stages. + """ + + SiblingKey = collections.namedtuple( + 'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id']) + + def get_sibling_key(transform): +"""Returns a key that will be identical for common siblings.""" +transform_output_keys = list(transform.outputs.keys()) +# Return None as the sibling key for ineligible transforms. +if len(transform_output_keys + ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn: + return None +return SiblingKey( +spec_urn=transform.spec.urn, +spec_payload=transform.spec.payload, +inputs=tuple(transform.inputs.items()), +environment_id=transform.environment_id) + + # Group stages by keys. + stages_by_sibling_key = collections.defaultdict(list) + for stage in stages: +transform = only_transform(stage.transforms) +stages_by_sibling_key[get_sibling_key(transform)].append(stage) + + # Eliminate stages and build the output PCollection remapping dictionary. + pcoll_id_remap = {} Review comment: Noted. I'm not currently aware of any requirement to keeping this around beyond inside this optimizer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r465913553 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_siblings(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + """Runs common subexpression elimination for common siblings. + + If stages have common input, an identical transform, and one output each, + then all but one stages will be eliminated, and the output of the remaining + will be connected to the original output PCollections of the eliminated + stages. This elimination runs only once, not recursively, and will only + eliminate the first stage after a common input, rather than a chain of + stages. + """ + + SiblingKey = collections.namedtuple( + 'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id']) + + def get_sibling_key(transform): +"""Returns a key that will be identical for common siblings.""" +transform_output_keys = list(transform.outputs.keys()) +# Return None as the sibling key for ineligible transforms. +if len(transform_output_keys + ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn: + return None +return SiblingKey( +spec_urn=transform.spec.urn, +spec_payload=transform.spec.payload, +inputs=tuple(transform.inputs.items()), +environment_id=transform.environment_id) Review comment: Noted. Deleted code due to removal of `eliminate_common_siblings`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases
yifanmai commented on a change in pull request #12185: URL: https://github.com/apache/beam/pull/12185#discussion_r465912431 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context): return stages +def eliminate_common_siblings(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterable[Stage] + """Runs common subexpression elimination for common siblings. + + If stages have common input, an identical transform, and one output each, + then all but one stages will be eliminated, and the output of the remaining + will be connected to the original output PCollections of the eliminated + stages. This elimination runs only once, not recursively, and will only + eliminate the first stage after a common input, rather than a chain of + stages. + """ + + SiblingKey = collections.namedtuple( + 'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id']) + + def get_sibling_key(transform): +"""Returns a key that will be identical for common siblings.""" +transform_output_keys = list(transform.outputs.keys()) +# Return None as the sibling key for ineligible transforms. +if len(transform_output_keys + ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn: + return None +return SiblingKey( +spec_urn=transform.spec.urn, +spec_payload=transform.spec.payload, +inputs=tuple(transform.inputs.items()), +environment_id=transform.environment_id) + + # Group stages by keys. + stages_by_sibling_key = collections.defaultdict(list) + for stage in stages: +transform = only_transform(stage.transforms) +stages_by_sibling_key[get_sibling_key(transform)].append(stage) + + # Eliminate stages and build the output PCollection remapping dictionary. + pcoll_id_remap = {} + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +if sibling_key is None or len(sibling_stages) == 1: + continue +output_pcoll_ids = [ +only_element(stage.transforms[0].outputs.values()) +for stage in sibling_stages +] +to_delete_pcoll_ids = output_pcoll_ids[1:] +for to_delete_pcoll_id in to_delete_pcoll_ids: + pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0] + del context.components.pcollections[to_delete_pcoll_id] +del sibling_stages[1:] + + # Yield stages while remapping output PCollections if needed. + for sibling_key, sibling_stages in stages_by_sibling_key.items(): +for stage in sibling_stages: + input_keys_to_remap = [] + for input_key, input_pcoll_id in stage.transforms[0].inputs.items(): +if input_pcoll_id in pcoll_id_remap: + input_keys_to_remap.append(input_key) + for input_key_to_remap in input_keys_to_remap: +stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[ +stage.transforms[0].inputs[input_key_to_remap]] + yield stage + + +def pack_combiners(stages, context): + # type: (Iterable[Stage], TransformContext) -> Iterator[Stage] + """Packs sibling CombinePerKey stages into a single CombinePerKey. + + If CombinePerKey stages have a common input, one input each, and one output + each, pack the stages into a single stage that runs all CombinePerKeys and + outputs resulting tuples to a new PCollection. A subsequent stage unpacks + tuples from this PCollection and sends them to the original output + PCollections. + """ + + class _UnpackFn(core.DoFn): +"""A DoFn that unpacks a packed to multiple tagged outputs. + +Example: + tags = (T1, T2, ...) + input = (K, (V1, V2, ...)) + output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ... +""" + +def __init__(self, tags): + self._tags = tags + +def process(self, element): + key, values = element + return [ + core.pvalue.TaggedOutput(tag, (key, value)) + for tag, value in zip(self._tags, values) + ] + + def _get_fallback_coder_id(): +return context.add_or_get_coder_id( +coders.registry.get_coder(object).to_runner_api(None)) + + def _get_component_coder_id_from_kv_coder(coder, index): +assert index < 2 +if coder.spec.urn == common_urns.coders.KV.urn and len( +coder.component_coder_ids) == 2: + return coder.component_coder_ids[index] +return _get_fallback_coder_id() + + def _get_key_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 0) + + def _get_value_coder_id_from_kv_coder(coder): +return _get_component_coder_id_from_kv_coder(coder, 1) + + def _try_fuse_stages(a, b): +if a.can_fuse(b, context): + return a.fuse(b) +else: + raise ValueError + + def _try_merge_environments(env1, env2): +if env1 is None: + return env2 +elif env2 is None: + return env1 +else: + if env1 != env2: +
[GitHub] [beam] davidcavazos commented on pull request #12118: [BEAM-7705] Add BigQuery Java samples
davidcavazos commented on pull request #12118: URL: https://github.com/apache/beam/pull/12118#issuecomment-669345025 Hi @kennknowles, tests are passing now, can you please give it a check whenever you have a chance? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465902065 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Originally the idea was that we didn't want the SDK to have to perform these calculations and it is why each operator was going to report work_completed/work_remaining if it had them but it seems like accurate splitting by fraction needs to take it into account. Using the graph to compute the progress shouldn't be any more/less difficult then the work that is being put into the SDK. Is there still value in reporting the work_completed/work_remaining metrics then? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
lostluck commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r465879720 ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,144 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { Review comment: Per the other thread, please move these to parameters on CrossLanguage and TryCrossLanguage instead. Do not try to force in compatibility with the legacy External, it's OK for them to have two separate calls and paths. By having them as a struct it's not clear what is required and what is not, and the compiler won't help the user by failing at compile time. An aside: The other issue here is you've mixed up user side parameters with internal implementation details, and made them part of the API surface. APIs are easiest to use when the user knows how to fill everything and what is required or not. The components and Expanded transform and requirements fields are not something that users would be filling in for example. Types are cheap. Make a new type instead of trying to reuse something that almost fits. ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,151 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { + idint + Urn string + Payload []byte + In[]PCollection + Out []FullType + Bounded bool + ExpansionAddr string + Components*pipepb.Components + ExpandedTransform *pipepb.PTransform + Requirements []string +} + +// CrossLanguage is the temporary API to execute external transforms +// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure +func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection { + if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set Review comment: Note, by this comment, the intent was for you to remove this dead code, as it's unnecessary. ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,144 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { + idint + Urn string + Payload []byte + In[]PCollection + Out []FullType + Bounded bool + ExpansionAddr string + Components*pipepb.Components + ExpandedTransform *pipepb.PTransform + Requirements []string Review comment: Move these to a graph.CrossLanguage struct, but have their proto types be interface{} instead, with a comment about what the types should be. Given those fields are only used by beam framework internals, there's little risk in using type assertions for them in the right places, such as the graphx package. ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,151 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" +
[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
lostluck commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r464721180 ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,151 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { + idint + Urn string + Payload []byte + In[]PCollection + Out []FullType + Bounded bool + ExpansionAddr string + Components*pipepb.Components + ExpandedTransform *pipepb.PTransform + Requirements []string +} + +// CrossLanguage is the temporary API to execute external transforms +// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure +func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection { + if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set + // return Legacy External API + } + + /* + Add ExternalTranform to the Graph + */ + // Validating scope and inputs + if !s.IsValid() { + // return nil, errors.New("invalid scope") + fmt.Println("invalid scope") + } + for i, col := range e.In { + if !col.IsValid() { + // return nil, errors.Errorf("invalid pcollection to external: index %v", i) + fmt.Printf("\ninvalid pcollection to external: index %v", i) + + } + } + + // Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible) + payload := &graph.Payload{ + URN: e.Urn, + Data: e.Payload, + } + var ins []*graph.Node + for _, col := range e.In { + ins = append(ins, col.n) + } + edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload) + + // TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline + // Adding ExternalTransform to pipeline referenced by MultiEdge ID + if p.ExpandedTransforms == nil { + p.ExpandedTransforms = make(map[string]*ExternalTransform) + } + p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e Review comment: As discussed, this data can be part of the graph.External node (or a new graph.CrossLanguage struct if desired) which keeps it as part of the graph and can be handled appropriately in graphx/translate.go. There's absolutely no need to add a new way to pass information in through the pipeline OR the suggestion you have for scope. Use the existing abstraction. If it's not sufficient, please articulate why. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK
lostluck commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r464721180 ## File path: sdks/go/pkg/beam/external.go ## @@ -16,10 +16,151 @@ package beam import ( + "context" + "fmt" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "google.golang.org/grpc" ) +// ExternalTransform represents the cross-language transform in and out of the Pipeline as a MultiEdge and Expanded proto respectively +type ExternalTransform struct { + idint + Urn string + Payload []byte + In[]PCollection + Out []FullType + Bounded bool + ExpansionAddr string + Components*pipepb.Components + ExpandedTransform *pipepb.PTransform + Requirements []string +} + +// CrossLanguage is the temporary API to execute external transforms +// TODO(pskevin): Handle errors using the TryN and Must strategies instead one function handling multiple points of failure +func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection { + if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the value was ever set + // return Legacy External API + } + + /* + Add ExternalTranform to the Graph + */ + // Validating scope and inputs + if !s.IsValid() { + // return nil, errors.New("invalid scope") + fmt.Println("invalid scope") + } + for i, col := range e.In { + if !col.IsValid() { + // return nil, errors.Errorf("invalid pcollection to external: index %v", i) + fmt.Printf("\ninvalid pcollection to external: index %v", i) + + } + } + + // Using exisiting MultiEdge format to represent ExternalTransform (already backwards compatible) + payload := &graph.Payload{ + URN: e.Urn, + Data: e.Payload, + } + var ins []*graph.Node + for _, col := range e.In { + ins = append(ins, col.n) + } + edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload) + + // TODO(pskevin): There needs to be a better way of associating this ExternalTransform to the pipeline + // Adding ExternalTransform to pipeline referenced by MultiEdge ID + if p.ExpandedTransforms == nil { + p.ExpandedTransforms = make(map[string]*ExternalTransform) + } + p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e Review comment: As discussed, this data can be part of the graph.External node (or a new graph.CrossLanguage struct if desired) which keeps it as part of the graph and can be handled appropriately in graphx/translate.go. There's absolutely no need to add a new way to pass information in through the pipeline. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
boyuanzz commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465891961 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Thanks for the formula. If we let the runner to do the computation, the runner needs to lookup the graph to figure out which progress is from downstream. I'm thinking about another option: we can add a progress signal for split specifically, which should be reported by the root node of the SDK graph. All computations you have mentioned can be done in the SDK side recursively. The runner side can decide whether to look into this signal by knowing whether there is an SDF(or SDFs) in the SDK graph. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12408: [BEAM-10602] Display Python streaming metrics in Grafana dashboard
tysonjh commented on pull request #12408: URL: https://github.com/apache/beam/pull/12408#issuecomment-669322906 I noticed there aren't any results displaying for any other ParDo load tests now. Could this change have broken those? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidcavazos opened a new pull request #12472: [BEAM-7390] Add sum code snippets
davidcavazos opened a new pull request #12472: URL: https://github.com/apache/beam/pull/12472 Add `Sum` code snippets. R: @aaltay R: @rosetn Staged: http://apache-beam-website-pull-requests.storage.googleapis.com/12462/documentation/transforms/python/aggregation/sum/index.html Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
[GitHub] [beam] lukecwik commented on a change in pull request #12367: [BEAM-10564] Support more Avro field name formats when mapping to Jav…
lukecwik commented on a change in pull request #12367: URL: https://github.com/apache/beam/pull/12367#discussion_r465884172 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java ## @@ -601,12 +601,26 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE private Map getMapping(Schema schema) { Map mapping = Maps.newHashMap(); for (Field field : schema.getFields()) { -String underscore = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); -mapping.put(underscore, field.getName()); +String fieldName = field.getName(); +String getter; +if (fieldName.contains("_")) { + if (Character.isLowerCase(fieldName.charAt(0))) { +// field_name -> fieldName +getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, fieldName); + } else { +// FIELD_NAME -> fIELDNAME +getter = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, fieldName.replace("_", "")); Review comment: ```suggestion getter = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, fieldName.replace("_", "")); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc commented on pull request #12435: [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow
kkucharc commented on pull request #12435: URL: https://github.com/apache/beam/pull/12435#issuecomment-669316184 r: @tysonjh would you find time to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.
lostluck commented on pull request #12471: URL: https://github.com/apache/beam/pull/12471#issuecomment-669307726 R: @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck opened a new pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.
lostluck opened a new pull request #12471: URL: https://github.com/apache/beam/pull/12471 Adds initial Schema to Go type conversions and vice versa. In particular, handles the "easy" stuff: basic struct to Row conversion, Slices, and maps, basic field name conversions. Doesn't implement struct+schema compatibility comparisons, which are necessary at pipeline construction time, however, this is more than sufficient to allow other immediately incoming code to unlock the row coder tests in standard_coders.yaml. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build
[GitHub] [beam] apilloud merged pull request #12416: Update google-api-services versions.
apilloud merged pull request #12416: URL: https://github.com/apache/beam/pull/12416 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh edited a comment on pull request #12416: Update google-api-services versions.
tysonjh edited a comment on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-669303736 > > Failed to publish publication 'mavenJava' to repository 'mavenLocal' > > > Invalid publication 'mavenJava': supplied version does not match POM file (cannot edit version directly in the POM file). > > > > > > Not sure what this means, will need to investigate unless you know. > > May just need to run a ./gradlew clean (https://issues.apache.org/jira/browse/BEAM-9437). Trying that. What a mess. I spent a good amount of time, ~3hr or so, trying to get the beam-linkage-check.sh to work. I ran into numerous issues from a problem with the ':sdks:java:bom:publishMavenJavaPublicationToMavenLocal' task, problems building flink tests for 1.8 and 1.9, apex runner tasks executing even though the runner is gone, major/minor version mismatches in Java classes, unsupported errorprone versions. I reached out to a @lukecwik and @apilloud for help as well (thank you both). What worked in the end: * ensure using Java 8 (both binary and classpath) * a fresh new clone of apache/beam https://gist.github.com/tysonjh/d8d28689fc18b536b32498cb0fd27382 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.
tysonjh commented on pull request #12416: URL: https://github.com/apache/beam/pull/12416#issuecomment-669303736 > > Failed to publish publication 'mavenJava' to repository 'mavenLocal' > > > Invalid publication 'mavenJava': supplied version does not match POM file (cannot edit version directly in the POM file). > > > > > > Not sure what this means, will need to investigate unless you know. > > May just need to run a ./gradlew clean (https://issues.apache.org/jira/browse/BEAM-9437). Trying that. What a mess. I spent a good amount of time, ~3hr or so, trying to get the beam-linkage-check.sh to work. I ran into numerous issues from a problem with the ':sdks:java:bom:publishMavenJavaPublicationToMavenLocal' task, problems building flink tests for 1.8 and 1.9, apex runner tasks executing even though the runner is gone, major/minor version mismatches in Java classes, unsupported errorprone versions. I reached out to a @lukecwik and@apilloud for help as well (thank you both). What worked in the end: * ensure using Java 8 (both binary and classpath) * a fresh new clone of apache/beam https://gist.github.com/tysonjh/d8d28689fc18b536b32498cb0fd27382 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465855425 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Yes, truncate exposes some of the issues where a non root SDF makes things interesting. For option2, if we assume that `inprogress` is included in `remaining` then: ``` total = completed + remaining fraction_completed = (inprogress / remaining) * downstream_fraction_completed + completed / total ``` `downstream_fraction_completed` would be computed recursively and could also be computed effectively if there were multiple consumers with `(completed_conumers + downstream_fraction_completed) / total_num_consumers` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #12467: [BEAM-7996] Add map & nil encoding to Go SDK.
lostluck merged pull request #12467: URL: https://github.com/apache/beam/pull/12467 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465840738 ## File path: .github/workflows/python_tests.yml ## @@ -0,0 +1,196 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Python tests + +on: + schedule: +- cron: '10 2 * * *' + push: +branches: ['master', 'release-*'] +tags: 'v*' + pull_request: +branches: ['master', 'release-*'] +tags: 'v*' +paths: ['sdks/python/**', 'model/**'] + + +env: + GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET }}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/" + GCP_BUCKET: ${{ secrets.GCP_BUCKET }} # beam-wheels-staging + GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} # apache-beam-testing + GCP_REGION: ${{ secrets.GCP_REGION}} # us-central + +jobs: + + build_python_sdk_source: +name: 'Build Python SDK Source' +if: github.repository_owner == 'apache' && (github.event_name == 'push' || github.event_name == 'schedule') Review comment: > Does github impose any limits on how many workflow could run in parallel? Limits for concurrent jobs are between 20-180 depends on the plan ([docs](https://docs.github.com/en/actions/getting-started-with-github-actions/about-github-actions#usage-limits)). I heard that apache has Enterprise plan fot gh-actions but I am not sure about it. Do you have more information about it @gmcdonald ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on pull request #12452: URL: https://github.com/apache/beam/pull/12452#issuecomment-669283521 @tvalentyn thanks for review. > * Is there a way how an author or a reviewer can rerun the action to retry the test or the only way is to re-push the PR? There is an event type called `workflow_dispatch` which enables button to manually trigger workflow I just added it. https://github.blog/changelog/2020-07-06-github-actions-manual-triggers-with-workflow_dispatch/ > * When we run unit tests, we create xml files with test results (see `generated xml file: D:\a\beam\beam\sdks\python\pytest_py36-win.xml ` entries in console logs). It would be great if we could somehow visualize that to see failing tests/messages easily without having to dig through console logs. Are there any options to do that? This could arrive in future PRs. I am not sure about visualisation, I have to investigate it more depth, however I add uploads of xml files as artifacts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack
aromanenko-dev commented on a change in pull request #12422: URL: https://github.com/apache/beam/pull/12422#discussion_r465702081 ## File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java ## @@ -39,16 +40,27 @@ private final String secretKey; private final Regions region; private final @Nullable String serviceEndpoint; + private final boolean verifyCertificate; BasicKinesisProvider( - String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) { + String accessKey, + String secretKey, + Regions region, + @Nullable String serviceEndpoint, + boolean verifyCertificate) { checkArgument(accessKey != null, "accessKey can not be null"); checkArgument(secretKey != null, "secretKey can not be null"); checkArgument(region != null, "region can not be null"); this.accessKey = accessKey; this.secretKey = secretKey; this.region = region; this.serviceEndpoint = serviceEndpoint; +this.verifyCertificate = verifyCertificate; + } + + BasicKinesisProvider( Review comment: Why you decided to modify `BasicKinesisProvider` and not just create a new provider class for testing that extends this basic one? ## File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java ## @@ -95,32 +123,114 @@ private void runRead() { PCollection output = pipelineRead.apply( KinesisIO.read() -.withStreamName(options.getAwsKinesisStream()) +.withStreamName(streamName) .withAWSClientsProvider( options.getAwsAccessKey(), options.getAwsSecretKey(), -Regions.fromName(options.getAwsKinesisRegion())) -.withMaxNumRecords(numberOfRows) +Regions.fromName(options.getAwsKinesisRegion()), +options.getAwsServiceEndpoint(), +options.getAwsVerifyCertificate()) +.withMaxNumRecords(options.getNumberOfRecords()) // to prevent endless running in case of error -.withMaxReadTime(Duration.standardMinutes(10)) - .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) -.withInitialTimestampInStream(now) +.withMaxReadTime(Duration.standardMinutes(10L)) + .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withRequestRecordsLimit(1000)); PAssert.thatSingleton(output.apply("Count All", Count.globally())) -.isEqualTo((long) numberOfRows); +.isEqualTo((long) options.getNumberOfRecords()); PCollection consolidatedHashcode = output .apply(ParDo.of(new ExtractDataValues())) .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); PAssert.that(consolidatedHashcode) -.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords())); pipelineRead.run().waitUntilFinish(); } + /** Necessary setup for localstack environment. */ + private static void setupLocalstack() { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + +localstackContainer = +new LocalStackContainer(LOCALSTACK_VERSION) +.withServices(LocalStackContainer.Service.KINESIS) +.withEnv("USE_SSL", "true") +.withStartupAttempts(3); +localstackContainer.start(); + +options.setAwsServiceEndpoint( +localstackContainer +.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) +.getServiceEndpoint() +.replace("http", "https")); +options.setAwsKinesisRegion( +localstackContainer +.getEndpointConfiguration(LocalStackContainer.Service.KINESIS) +.getSigningRegion()); +options.setAwsAccessKey( + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId()); +options.setAwsSecretKey( + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey()); +options.setNumberOfRecords(1000); +options.setNumberOfShards(1); +options.setAwsKinesisStream("beam_kinesis_test"); +options.setAwsVerifyCertificate(false); + } + + private static AmazonKinesis createKinesisClient() { +AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); + +AWSCredentialsProvider credentialsProvider = +new AWSStaticCredentialsProvider( +new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())); +clientBuil
[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow
kkucharc commented on pull request #12435: URL: https://github.com/apache/beam/pull/12435#issuecomment-669252849 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow
kkucharc commented on pull request #12435: URL: https://github.com/apache/beam/pull/12435#issuecomment-669252738 Run Python Load Tests ParDo Dataflow Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc merged pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK
kkucharc merged pull request #12151: URL: https://github.com/apache/beam/pull/12151 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc commented on pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK
kkucharc commented on pull request #12151: URL: https://github.com/apache/beam/pull/12151#issuecomment-669206144 I updated CHANGES.md as requested. It is approved by @pabloem so I am merging this PR. Thank you a lot Pablo for the review! :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow
kkucharc commented on pull request #12435: URL: https://github.com/apache/beam/pull/12435#issuecomment-669223084 run seed job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465753725 ## File path: sdks/python/apache_beam/typehints/typecheck_test_py3.py ## @@ -92,23 +93,29 @@ def test_wrapper_pass_through(self): # We use a file to check the result because the MyDoFn instance passed is # not the same one that actually runs in the pipeline (it is serialized # here and deserialized in the worker). -with tempfile.NamedTemporaryFile(mode='w+t') as f: - dofn = MyDoFn(f.name) + +fd, path = tempfile.mkstemp() +try: + os.close(fd) Review comment: WDYT about adding comment in such places with JIRA ? (as you mentioned creating JIRA in https://github.com/apache/beam/pull/12452#discussion_r465209691) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on pull request #12452: URL: https://github.com/apache/beam/pull/12452#issuecomment-669219601 @tvalentyn answering your question: > Also, all actions workflow run in parallel, right? so it takes roughly the same time as running a regular precommit? In such case we could consider running it on PR. Building sdk source dist is used as prerequisite for `python_wordcount_dataflow` job . Building source distribution is already validated for PR in `.github/workflows/build_wheels.yml` workflow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465736828 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -296,8 +296,10 @@ def test_sink_transform_int96(self): path, self.SCHEMA96, num_shards=1, shard_name_template='') def test_sink_transform(self): -with tempfile.NamedTemporaryFile() as dst: - path = dst.name +fd, path = tempfile.mkstemp() Review comment: There was problem on windows since `with tempfile.NamedTemporaryFile() as dst` provide `tempfile` which is already opened. On windows any opened file seems to be read only which coused PermissionError. I know this workaround is ugly but it the best way I found to do it. Is separate commit with JIRA will not be lost after merging? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] prodonjs commented on a change in pull request #12460: [BEAM-10545] HtmlView module
prodonjs commented on a change in pull request #12460: URL: https://github.com/apache/beam/pull/12460#discussion_r465764186 ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx ## @@ -0,0 +1,119 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import * as React from 'react'; + +export interface IHtmlProvider { + readonly html: string; + readonly script: Array; Review comment: *nit* I think it's generally preferred to use `string[]` for primitive array types. ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx ## @@ -0,0 +1,119 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import * as React from 'react'; + +export interface IHtmlProvider { + readonly html: string; + readonly script: Array; +} + +interface IHtmlViewProps { + htmlProvider: IHtmlProvider; +} + +interface IHtmlViewState { + innerHtml: string; + script: Array; +} + +/** + * A common HTML viewing component that renders given HTML and executes scripts + * from the given provider. + */ +export class HtmlView extends React.Component { + constructor(props: IHtmlViewProps) { +super(props); +this.state = { + innerHtml: props.htmlProvider.html, + script: [] +}; + } + + componentDidMount(): void { +this._updateRenderTimerId = setInterval(() => this.updateRender(), 1000); + } + + componentWillUnmount(): void { +clearInterval(this._updateRenderTimerId); + } + + updateRender(): void { +const currentHtml = this.state.innerHtml; +const htmlToUpdate = this.props.htmlProvider.html; +const currentScript = this.state.script; +const scriptToUpdate = [...this.props.htmlProvider.script]; +if (htmlToUpdate !== currentHtml) { + this.setState({ +innerHtml: htmlToUpdate, +// As long as the html is updated, clear the script state. +script: [] + }); +} +/* Depending on whether this iteration updates the html, the scripts + * are executed differently. + * Html updated: all scripts are new, start execution from index 0; + * Html not updated: only newly added scripts need to be executed. + */ +const currentScriptLength = + htmlToUpdate === currentHtml ? currentScript.length : 0; +if (scriptToUpdate.length > currentScriptLength) { + this.setState( +{ + script: scriptToUpdate +}, +// Executes scripts once the state is updated. +() => { + for (let i = currentScriptLength; i < scriptToUpdate.length; ++i) { +new Function(scriptToUpdate[i])(); + } +} + ); +} + } + + render(): React.ReactNode { +return ( + // This injects raw HTML fetched from kernel into JSX. + Review comment: Haha as the property name suggests, this is definitely a risky operation from a security perspective since the markup being injected has the ability to run scripts in the user's context with access to whatever resources their credentials provide. Is there any alternative that could be considered? Could you create an `` element and render the HTML there so that it's sandboxed? I don't have any fundamental objections to this as I don't know the full context, bu I know this type of thing has been looked upon with significant concern by our internal security reviewers and has implications for us being able to include certain types of extensions in our enterprise product. ## File path: sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx ## @@ -0,0 +1,119 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http:/
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465774681 ## File path: sdks/python/apache_beam/runners/worker/log_handler_test.py ## @@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries): self.assertEqual( '%s: %s' % (msg, num_received_log_entries), log_entry.message) self.assertTrue( -re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location), +re.match( +r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location), Review comment: I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused: ``` raise source.error("bad escape %s" % escape, len(escape)) sre_constants.error: bad escape \l at position 2 ``` https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653 I reverted change. Should I use it somehow differently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465774681 ## File path: sdks/python/apache_beam/runners/worker/log_handler_test.py ## @@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries): self.assertEqual( '%s: %s' % (msg, num_received_log_entries), log_entry.message) self.assertTrue( -re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location), +re.match( +r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location), Review comment: I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused: ``` raise source.error("bad escape %s" % escape, len(escape)) sre_constants.error: bad escape \l at position 2 ``` https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653 I reverted change. Should I use it somehow differently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465774681 ## File path: sdks/python/apache_beam/runners/worker/log_handler_test.py ## @@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries): self.assertEqual( '%s: %s' % (msg, num_received_log_entries), log_entry.message) self.assertTrue( -re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location), +re.match( +r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location), Review comment: I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused: ``` 2020-08-05T14:25:17.1905260Z > raise source.error("bad escape %s" % escape, len(escape)) 2020-08-05T14:25:17.1905424Z E sre_constants.error: bad escape \l at position 2 ``` https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653 I reverted change. Should I use it somehow differently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465754369 ## File path: .github/workflows/python_tests.yml ## @@ -0,0 +1,196 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Python tests + +on: + schedule: +- cron: '10 2 * * *' + push: +branches: ['master', 'release-*'] +tags: 'v*' + pull_request: +branches: ['master', 'release-*'] +tags: 'v*' +paths: ['sdks/python/**', 'model/**'] + + +env: + GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET }}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/" Review comment: It is unnecessary copy-paste leftover. Deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow
kkucharc commented on pull request #12435: URL: https://github.com/apache/beam/pull/12435#issuecomment-669225991 Run Python Load Tests ParDo Dataflow Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465730296 ## File path: .github/workflows/python_tests.yml ## @@ -0,0 +1,196 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Python tests + +on: + schedule: +- cron: '10 2 * * *' + push: +branches: ['master', 'release-*'] +tags: 'v*' + pull_request: +branches: ['master', 'release-*'] +tags: 'v*' +paths: ['sdks/python/**', 'model/**'] + + +env: + GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET }}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/" + GCP_BUCKET: ${{ secrets.GCP_BUCKET }} # beam-wheels-staging + GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} # apache-beam-testing + GCP_REGION: ${{ secrets.GCP_REGION}} # us-central + +jobs: + + build_python_sdk_source: +name: 'Build Python SDK Source' +if: github.repository_owner == 'apache' && (github.event_name == 'push' || github.event_name == 'schedule') +runs-on: ubuntu-latest +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: 3.7 + - name: Get build dependencies +working-directory: ./sdks/python +run: pip install -r build-requirements.txt + - name: Install wheels +run: pip install wheel + - name: Build source +working-directory: ./sdks/python +run: python setup.py sdist --formats=zip + - name: Add checksums +working-directory: ./sdks/python/dist +run: | + file=$(ls | grep .zip | head -n 1) + sha512sum $file > ${file}.sha512 + - name: Unzip source +working-directory: ./sdks/python +run: unzip dist/$(ls dist | grep .zip | head -n 1) + - name: Rename source directory +working-directory: ./sdks/python +run: mv $(ls | grep apache-beam) apache-beam-source + - name: Upload compressed sources as artifacts +uses: actions/upload-artifact@v2 +with: + name: source_zip + path: sdks/python/dist + + python_unit_tests: +name: 'Python Unit Tests' +runs-on: ${{ matrix.os }} +strategy: + fail-fast: false + matrix: +os: [ubuntu-latest, macos-latest, windows-latest] +params: [ + {"py_ver": "3.5", "tox_env": "py35"}, + {"py_ver": "3.6", "tox_env": "py36"}, + {"py_ver": "3.7", "tox_env": "py37"}, + {"py_ver": "3.8", "tox_env": "py38"}, +] +exclude: + # TODO remove exclusion after issue with protobuf is solved + # https://github.com/protocolbuffers/protobuf/issues/7765 + - os: windows-latest +params: {"py_ver": "3.8", "tox_env": "py38"} +steps: + - name: Checkout code +uses: actions/checkout@v2 + - name: Install python +uses: actions/setup-python@v2 +with: + python-version: ${{ matrix.params.py_ver }} + - name: Get build dependencies +working-directory: ./sdks/python +run: pip install -r build-requirements.txt + - name: Install tox +run: pip install tox + - name: Run tests basic unix +if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') +working-directory: ./sdks/python +run: tox -c tox.ini -e ${{ matrix.params.tox_env }} + - name: Run tests basic windows +if: startsWith(matrix.os, 'windows') +working-directory: ./sdks/python +run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-win + + python_wordcount_direct_runner: +name: 'Python Wordcount Direct Runner' +runs-on: ${{ matrix.os }} +strategy: + fail-fast: false + matrix: +os: [ubuntu-latest, macos-latest, windows-latest] +python: [3.5, 3.6, 3.7, 3.8] +exclude: + # TODO remove exclusion after issue with protobuf is solved + # https://github.com/protocolbuffers/protobuf/issues/7765 + - os: windows-latest +python: 3.8 +steps: Review comment: Right know it is not possible, however GitHub Actions is evolving so fast
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465753326 ## File path: sdks/python/apache_beam/runners/worker/log_handler_test.py ## @@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries): self.assertEqual( '%s: %s' % (msg, num_received_log_entries), log_entry.message) self.assertTrue( -re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location), +re.match( +r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location), Review comment: Added fix. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms
TobKed commented on a change in pull request #12452: URL: https://github.com/apache/beam/pull/12452#discussion_r465753725 ## File path: sdks/python/apache_beam/typehints/typecheck_test_py3.py ## @@ -92,23 +93,29 @@ def test_wrapper_pass_through(self): # We use a file to check the result because the MyDoFn instance passed is # not the same one that actually runs in the pipeline (it is serialized # here and deserialized in the worker). -with tempfile.NamedTemporaryFile(mode='w+t') as f: - dofn = MyDoFn(f.name) + +fd, path = tempfile.mkstemp() +try: + os.close(fd) Review comment: WDYT about adding comment in such places with JIRA ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc merged pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
kkucharc merged pull request #12149: URL: https://github.com/apache/beam/pull/12149 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damgad commented on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
damgad commented on pull request #12149: URL: https://github.com/apache/beam/pull/12149#issuecomment-669180916 @kkucharc Indeed, `Whitespace PreCommit` is not yet merged to master so technically shouldn't be here. It appeared because the seed job was run from the PR you linked. There's probably a formatting issue with trailing whitespaces, but you can disregard it and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damgad commented on a change in pull request #12444: Added a whitespace lint as part of python lint precommit
damgad commented on a change in pull request #12444: URL: https://github.com/apache/beam/pull/12444#discussion_r465674025 ## File path: .test-infra/jenkins/job_PreCommit_Whitespace.groovy ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import PrecommitJobBuilder + +PrecommitJobBuilder builder = new PrecommitJobBuilder( +scope: this, +nameBase: 'Whitespace', +gradleTask: ':whitespacePreCommit', +triggerPathPatterns: [ + '.*\\.md$', + '.*build\\.gradle$', +] Review comment: Hey, no worries. The regex looks fine, and I've also checked that it works. FYI: by running the seed job from the PR you can see your changes in action on Jenkins. However (practically) only committers could trigger that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kkucharc edited a comment on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
kkucharc edited a comment on pull request #12149: URL: https://github.com/apache/beam/pull/12149#issuecomment-669144965 I can see that now tests are passing 🎉 I see `LGTM` from @chamikaramj so I'm merging :) Thank you a lot @chamikaramj and @purbanow ! PS: after writing this comment `Run Whitespace PreCommit` appeared, but it seems this is something still in progress [link](https://github.com/apache/beam/pull/12444) so I believe this check appeared here by mistake. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damgad closed pull request #12470: DISREGARD - test PR
damgad closed pull request #12470: URL: https://github.com/apache/beam/pull/12470 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damgad opened a new pull request #12470: DISREGARD - test PR
damgad opened a new pull request #12470: URL: https://github.com/apache/beam/pull/12470 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-
[GitHub] [beam] kkucharc commented on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read
kkucharc commented on pull request #12149: URL: https://github.com/apache/beam/pull/12149#issuecomment-669144965 I can see that now tests are passing 🎉 I see `LGTM` from @chamikaramj so I'm merging :) Thank you a lot @chamikaramj and @purbanow ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org