[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466175841



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   It's wired that the override `toString()` has never been called.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK

2020-08-05 Thread GitBox


saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r466168597



##
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##
@@ -202,6 +209,37 @@ def __init__(
 self._sample_counter = 0
 self._next_sample = 0
 
+self.producer_type_hints = None
+self.producer_full_label = None
+self.producer_parameter_name = None
+
+if producer and hasattr(producer, 'spec') and hasattr(producer.spec,
+  'serialized_fn'):
+  fns = pickler.loads(producer.spec.serialized_fn)
+  if fns:
+if hasattr(fns[0], '_runtime_type_hints'):
+  self.producer_type_hints = fns[0]._runtime_type_hints
+if hasattr(fns[0], '_full_label'):
+  self.producer_full_label = fns[0]._full_label
+if hasattr(fns[0], '_runtime_parameter_name'):
+  self.producer_parameter_name = fns[0]._runtime_parameter_name
+
+self.consumer_type_hints = []
+self.consumer_full_labels = []
+self.consumer_parameter_names = []
+
+if consumers:

Review comment:
   I moved as much logic as possible to the visitor (and removed the 
duplicated code) but kept the producer/consumer distinction because that 
determines whether we type check using the output types (for the producer) 
versus the input types (for the consumer)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK

2020-08-05 Thread GitBox


saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r466166035



##
File path: sdks/python/apache_beam/runners/worker/operations.py
##
@@ -238,6 +247,7 @@ def __init__(self,
 self.execution_context = None  # type: Optional[ExecutionContext]
 self.consumers = collections.defaultdict(
 list)  # type: DefaultDict[int, List[Operation]]
+self.producer = None

Review comment:
   Ah thanks, I'm not actually updating the value in the child class, 
`DoOperation`, so let me get rid of that and directly pass `None`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109042



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
   I think maybe a test of pub/sub is necessary here because our connector 
has no direct dependency or invocation of pub/sub. The 'pubsub to search 
metadata' converter was tested in the unit test while read and write operations 
can be tested in an easier way here rather than using pub/sub. What's more, 
testing a streaming pipeline is complicated and needs much more work :(





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] saavannanavati commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK

2020-08-05 Thread GitBox


saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r466145176



##
File path: sdks/python/apache_beam/runners/common.py
##
@@ -1340,6 +1342,17 @@ def process_outputs(
 self.per_element_output_counter.add_input(0)
   return
 
+if isinstance(results, (dict, str, unicode, bytes)):
+  results_type = type(results).__name__
+  raise TypeCheckError(
+  'Returning a %s from a ParDo or FlatMap is '
+  'discouraged. Please use list("%s") if you really '
+  'want this behavior.' % (results_type, results))
+elif not isinstance(results, collections.Iterable):

Review comment:
   Why is this check slow? Also, if we remove it, how should we give the 
user the relevant error message?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damondouglas commented on a change in pull request #12448: [BEAM-9679] Add Additional Parameters lesson to Go SDK Katas

2020-08-05 Thread GitBox


damondouglas commented on a change in pull request #12448:
URL: https://github.com/apache/beam/pull/12448#discussion_r466143420



##
File path: 
learning/katas/go/core_transforms/windowing/additional_parameters/task.md
##
@@ -0,0 +1,77 @@
+
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements.  A window 
is a view into a fixed beginning and 
+fixed end to a set of data.  In the beam model, windowing subdivides a 
PCollection according to the
+timestamps of its individual elements.
+
+Beam can pass information about the window and timestamp to your elements in 
your DoFn.  All your previous
+lessons' DoFn had this information available, yet you never made use of it in 
your DoFn parameters.  In this 
+lesson you will.  The simple toy dataset has five git commit messages and 
their timestamps 

Review comment:
   I like the idea of more windowing lessons after the rest of the 
originally planned katas are complete.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli commented on a change in pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.

2020-08-05 Thread GitBox


youngoli commented on a change in pull request #12471:
URL: https://github.com/apache/beam/pull/12471#discussion_r466120138



##
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package schema contains utility functions for relating Go types and Beam 
Schemas.
+//
+// Not all Go types can be converted to schemas. This is Go is more expressive 
than
+// Beam schemas. Just as not all Go types can be serialized, similarly,
+// not all Beam Schemas will have a conversion to Go types, until the correct
+// mechanism exists in the SDK to handle them.
+//
+// While efforts will be made to have conversions be reversable, this will not
+// be possible in all instances. Eg. Go arrays as fields will be converted to
+// Beam Arrays, but a Beam Array type will map by default to a Go slice.
+package schema
+
+import (
+   "fmt"
+   "reflect"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+// FromType returns a Beam Schema of the passed in type.
+// Returns an error if the type cannot be converted to a Schema.
+func FromType(ot reflect.Type) (*pipepb.Schema, error) {
+   t := ot // keep the original type for errors.
+   // The top level schema for a pointer to struct and the struct is the 
same.
+   if t.Kind() == reflect.Ptr {
+   t = t.Elem()
+   }
+   if t.Kind() != reflect.Struct {
+   return nil, errors.Errorf("cannot convert %v to schema. 
FromType only converts structs to schemas", ot)
+   }
+   return structToSchema(t), nil
+}
+
+func structToSchema(t reflect.Type) *pipepb.Schema {
+   fields := make([]*pipepb.Field, 0, t.NumField())
+   for i := 0; i < t.NumField(); i++ {
+   fields = append(fields, structFieldToField(t.Field(i)))
+   }
+   return &pipepb.Schema{
+   Fields: fields,
+   }
+}
+
+func structFieldToField(sf reflect.StructField) *pipepb.Field {
+   name := sf.Name
+   if tag := sf.Tag.Get("beam"); tag != "" {
+   name, _ = parseTag(tag)
+   }
+   ftype := reflectTypeToFieldType(sf.Type)
+
+   return &pipepb.Field{
+   Name: name,
+   Type: ftype,
+   }
+}
+
+func reflectTypeToFieldType(ot reflect.Type) *pipepb.FieldType {
+   var isPtr bool
+   t := ot
+   if t.Kind() == reflect.Ptr {
+   isPtr = true
+   t = t.Elem()
+   }
+   switch t.Kind() {
+   case reflect.Map:
+   kt := reflectTypeToFieldType(t.Key())
+   vt := reflectTypeToFieldType(t.Elem())
+   return &pipepb.FieldType{
+   Nullable: isPtr,
+   TypeInfo: &pipepb.FieldType_MapType{
+   MapType: &pipepb.MapType{
+   KeyType:   kt,
+   ValueType: vt,
+   },
+   },
+   }
+   case reflect.Struct:
+   sch := structToSchema(t)
+   return &pipepb.FieldType{
+   Nullable: isPtr,
+   TypeInfo: &pipepb.FieldType_RowType{
+   RowType: &pipepb.RowType{
+   Schema: sch,
+   },
+   },
+   }
+   case reflect.Slice, reflect.Array:
+   // Special handling for []byte
+   if t == reflectx.ByteSlice {
+   return &pipepb.FieldType{
+   Nullable: isPtr,
+   TypeInfo: &pipepb.FieldType_AtomicType{
+   AtomicType: pipepb.AtomicType_BYTES,
+   },
+   }
+   }
+   vt := reflectTypeToFieldType(t.Elem())
+   return &pipepb.FieldType{
+   Nullable: isPtr,
+ 

[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


robinyqiu commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466122482



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   To make it clearer, you can get the `TimestampString` value from the 
`RexLiteral` like 
https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java#L74,
 and then use it to create your own `SqlDateTimeLiteral`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


robinyqiu commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466119772



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   We don't have to extend `SqlTimestampLiteral`. You can extend 
`SqlLiteral` directly, like in 
https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java#L95





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on a change in pull request #12348: [BEAM-10240] Support ZetaSQL DATETIME functions in BeamSQL

2020-08-05 Thread GitBox


ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r466111720



##
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##
@@ -850,6 +848,25 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, 
Value value) {
 // TODO: Doing micro to mills truncation, need to throw exception.
 ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), 
timeType, false);
 break;
+  case TYPE_DATETIME:
+// Cannot simply call makeTimestampWithLocalTimeZoneLiteral() for 
ZetaSQL DATETIME type
+// because later it will be unparsed to the string representation of 
timestamp (e.g. "SELECT
+// DATETIME '2008-12-25 15:30:00'" will be unparsed to "SELECT 
TIMESTAMP '2008-12-25
+// 15:30:00:00'"). So we create a wrapper function here such that 
we can later recognize
+// it and customize its unparsing in BeamBigQuerySqlDialect.
+ret =
+rexBuilder()
+.makeCall(
+SqlOperators.createZetaSqlFunction(
+BeamBigQuerySqlDialect.DATETIME_LITERAL_FUNCTION,

Review comment:
   We could not create a class extending `SqlTimeStampLiteral` because its 
constructor is private.
   
https://github.com/apache/calcite/blob/2088488ac8327b19512a76a122cae2961fc551c3/core/src/main/java/org/apache/calcite/sql/SqlTimestampLiteral.java#L34





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109325



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# create a temp Dicom store based on the time stamp
+self.temp_dicom_store = "DICOM_store_" + str(time.time())

Review comment:
   Changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109042



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
   I don't think a test of pub/sub is necessary here because our connector 
has no direct dependency or invocation of pub/sub. The 'pubsub to search 
metadata' converter was tested in the unit test while read and write operations 
can be tested in an easier way here rather than using pub/sub. What's more, 
testing a streaming pipeline is complicated and needs much more work :(





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #12455: [BEAM-10630] Include data from load tests in the release process

2020-08-05 Thread GitBox


aaltay commented on pull request #12455:
URL: https://github.com/apache/beam/pull/12455#issuecomment-669645454


   LGTM. Thank you @mxm. ( I do not know if there were more comments on the 
dev@ list thread, this LGTM as it is.)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104978



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# create a temp Dicom store based on the time stamp
+self.temp_dicom_store = "DICOM_store_" + str(time.time())
+create_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+client = storage.Client()
+bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+data = json.loads(blob.download_as_string())
+self.expected_output_metadata = data
+
+  def tearDown(self):
+# clean up the temp Dicom store
+delete_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+# Search and compare the metadata of a persistent DICOM store.
+input_dict = {}
+input_dict['project_id'] = self.project
+input_dict['region'] = REGION
+input_dict['dataset_id'] = DATA_SET_ID
+input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+input_dict['search_type'] = "instances"
+
+expected_dict = {}
+expected_dict['result'] = self.expected_output_metadata
+expected_dict['status'] = 200
+expected_dict['input'] = input_dict
+expected_dict['success'] = True
+
+with TestPipeline() as p:
+  results = (p | beam.Create([input_dict]) | DicomSearch())
+  assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+# Store DICOM files to a empty DICOM store from a GCS bucket,
+# then check if the store metadata match.
+input_dict = {}
+input_dict['project_id'] = self.project
+input_dict['region'] = REGION
+input_dict['dataset_id'] = DATA_SET_ID
+input_dict['dico

[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104754



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# create a temp Dicom store based on the time stamp
+self.temp_dicom_store = "DICOM_store_" + str(time.time())
+create_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+client = storage.Client()
+bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+data = json.loads(blob.download_as_string())
+self.expected_output_metadata = data
+
+  def tearDown(self):
+# clean up the temp Dicom store
+delete_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+# Search and compare the metadata of a persistent DICOM store.
+input_dict = {}
+input_dict['project_id'] = self.project
+input_dict['region'] = REGION
+input_dict['dataset_id'] = DATA_SET_ID
+input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+input_dict['search_type'] = "instances"
+
+expected_dict = {}
+expected_dict['result'] = self.expected_output_metadata
+expected_dict['status'] = 200
+expected_dict['input'] = input_dict
+expected_dict['success'] = True
+
+with TestPipeline() as p:
+  results = (p | beam.Create([input_dict]) | DicomSearch())
+  assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+# Store DICOM files to a empty DICOM store from a GCS bucket,
+# then check if the store metadata match.
+input_dict = {}
+input_dict['project_id'] = self.project
+input_dict['region'] = REGION
+input_dict['dataset_id'] = DATA_SET_ID
+input_dict['dico

[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104661



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# create a temp Dicom store based on the time stamp
+self.temp_dicom_store = "DICOM_store_" + str(time.time())
+create_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+client = storage.Client()
+bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+data = json.loads(blob.download_as_string())
+self.expected_output_metadata = data
+
+  def tearDown(self):
+# clean up the temp Dicom store
+delete_dicom_store(self.project, DATA_SET_ID, REGION, 
self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):

Review comment:
   test case added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #12477: [BEAM-10646] Simplify SparkPortableExecutionTest.

2020-08-05 Thread GitBox


ibzib opened a new pull request #12477:
URL: https://github.com/apache/beam/pull/12477


   - Remove SparkPortableExecutionTest.testExecution. …
 - Everything SparkPortableExecutionTest.testExecution tests is already 
covered by validates runner tests.
   
   - Don't wait for test to time out if pipeline fails.
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam

[GitHub] [beam] youngoli commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


youngoli commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r466056636



##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to 
count words
+package main

Review comment:
   This example requires running an expansion service separately in order 
to work, right? I'd add instructions to the package comment on how to run that 
so people can run this example without existing knowledge of how xlang works. 
See the stringsplit example for an example of this. It requires running on a 
job service that supports splitting, so I included instructions for running an 
external job service.

##
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// xlang_wordcount exemplifies using a cross language transform from Python to 
count words
+package main
+
+import (
+   "context"
+   "flag"
+   "fmt"
+   "log"
+   "regexp"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+   "github.com/apache/beam/sdks/go/pkg/beam"
+   "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+   "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+   // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+   _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+   // Set this option to choose a different input file or glob.
+   input = flag.String("input", "./input", "File(s) to read.")
+
+   // Set this required option to specify where to write the output.
+   output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+   wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+   empty   = beam.NewCounter("extract", "emptyLines")
+   lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+   lineLen.Update(ctx, int64(len(line)))
+   if len(strings.TrimSpace(line)) == 0 {
+   empty.Inc(ctx, 1)
+   }
+   for _, word := range wordRE.FindAllString(line, -1) {
+   emit(word)
+   }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+   return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+   beam.RegisterFunction(extractFn)
+   beam.RegisterFunction(formatFn)
+}
+
+func main() {
+   flag.Parse()
+   beam.Init()
+
+   if *output == "" {
+   log.Fatal("No output provided")
+   }
+
+   p := beam.NewPipeline()
+   s := p.Root()
+
+   lines := textio.Read(s, *input)
+   col := beam.ParDo(s, extractFn, lines)
+
+   // Using Cross-language Count from Python's test expansion service
+   // TODO(pskevin): Cleaner using-face API
+   outputType := typex.NewKV(typex.New(reflectx.String), 
typex.New(reflectx.Int64))
+   external := &beam.ExternalT

[GitHub] [beam] TheNeuralBit merged pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-08-05 Thread GitBox


TheNeuralBit merged pull request #12426:
URL: https://github.com/apache/beam/pull/12426


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12448: [BEAM-9679] Add Additional Parameters lesson to Go SDK Katas

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12448:
URL: https://github.com/apache/beam/pull/12448#discussion_r466074785



##
File path: 
learning/katas/go/core_transforms/windowing/additional_parameters/task.md
##
@@ -0,0 +1,77 @@
+
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements.  A window 
is a view into a fixed beginning and 
+fixed end to a set of data.  In the beam model, windowing subdivides a 
PCollection according to the
+timestamps of its individual elements.
+
+Beam can pass information about the window and timestamp to your elements in 
your DoFn.  All your previous
+lessons' DoFn had this information available, yet you never made use of it in 
your DoFn parameters.  In this 
+lesson you will.  The simple toy dataset has five git commit messages and 
their timestamps 

Review comment:
   Beam's not passing information to the elements, it's passing it to the 
dofn.
   Consider...
   `A DoFn can request timestamp and windowing information about the element 
it's processing. All your previous lessons had this information available as 
well. In this lesson you will make use of these parameters.`
   
   It's probably a personal nit of mine, but the "yet you" sounds accusatory 
(not saying that was your intent).

##
File path: 
learning/katas/go/core_transforms/windowing/additional_parameters/task.md
##
@@ -0,0 +1,77 @@
+
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements.  A window 
is a view into a fixed beginning and 
+fixed end to a set of data.  In the beam model, windowing subdivides a 
PCollection according to the
+timestamps of its individual elements.

Review comment:
   I don't know how precise we need to be in the kata's, but consider 
adding.
   
   "An element can be a part of one or more windows."

##
File path: 
learning/katas/go/core_transforms/windowing/additional_parameters/task.md
##
@@ -0,0 +1,77 @@
+
+
+# Windowing
+
+This lesson introduces the concept of windowed PCollection elements.  A window 
is a view into a fixed beginning and 
+fixed end to a set of data.  In the beam model, windowing subdivides a 
PCollection according to the
+timestamps of its individual elements.
+
+Beam can pass information about the window and timestamp to your elements in 
your DoFn.  All your previous
+lessons' DoFn had this information available, yet you never made use of it in 
your DoFn parameters.  In this 
+lesson you will.  The simple toy dataset has five git commit messages and 
their timestamps 

Review comment:
   I'd remove either `simple` or `toy`, they're redundant together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-08-05 Thread GitBox


TheNeuralBit commented on pull request #12426:
URL: https://github.com/apache/beam/pull/12426#issuecomment-669604112


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] reuvenlax commented on pull request #12474: OrderedListState API

2020-08-05 Thread GitBox


reuvenlax commented on pull request #12474:
URL: https://github.com/apache/beam/pull/12474#issuecomment-669603386


   Run Portable_Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #12476: [BEAM-10645] Create context for allowing non-parallel dataframe operations.

2020-08-05 Thread GitBox


robertwb opened a new pull request #12476:
URL: https://github.com/apache/beam/pull/12476


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuil

[GitHub] [beam] TheNeuralBit commented on pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

2020-08-05 Thread GitBox


TheNeuralBit commented on pull request #12426:
URL: https://github.com/apache/beam/pull/12426#issuecomment-669588208


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #12223: [Beam-4379] Make ParquetIO read splittable

2020-08-05 Thread GitBox


boyuanzz commented on a change in pull request #12223:
URL: https://github.com/apache/beam/pull/12223#discussion_r466020446



##
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##
@@ -154,10 +187,15 @@ public static ReadFiles readFiles(Schema schema) {
 
 abstract @Nullable GenericData getAvroDataModel();
 
+abstract boolean getSplit();

Review comment:
   `isSplittable`?

##
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##
@@ -187,12 +229,18 @@ public Read withAvroDataModel(GenericData model) {
 @Override
 public PCollection expand(PBegin input) {
   checkNotNull(getFilepattern(), "Filepattern cannot be null.");
-
-  return input
-  .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-  .apply(FileIO.matchAll())
-  .apply(FileIO.readMatches())
-  .apply(readFiles(getSchema()).withAvroDataModel(getAvroDataModel()));
+  PCollection inputFiles =
+  input
+  .apply(
+  "Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+  .apply(FileIO.matchAll())
+  .apply(FileIO.readMatches());
+  if (!getSplit()) {

Review comment:
   Should it be `if (getSplit())` ?

##
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##
@@ -187,12 +229,18 @@ public Read withAvroDataModel(GenericData model) {
 @Override
 public PCollection expand(PBegin input) {
   checkNotNull(getFilepattern(), "Filepattern cannot be null.");
-
-  return input
-  .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-  .apply(FileIO.matchAll())
-  .apply(FileIO.readMatches())
-  .apply(readFiles(getSchema()).withAvroDataModel(getAvroDataModel()));
+  PCollection inputFiles =
+  input
+  .apply(
+  "Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+  .apply(FileIO.matchAll())
+  .apply(FileIO.readMatches());
+  if (!getSplit()) {
+return inputFiles.apply(
+
readFiles(getSchema()).withSplit().withAvroDataModel(getAvroDataModel()));
+  } else {

Review comment:
   We can drop this redundant `else`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466022189



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +209,102 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a 
PCollection of {@link
+ * KafkaSourceDescriptor} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from a BigQuery table and read these topics via {@link 
ReadSourceDescriptors}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadSourceDescriptors#getConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadSourceDescriptors#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadSourceDescriptors#getOffsetConsumerConfig()} is the same 
as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadSourceDescriptors#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadSourceDescriptors#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadSourceDescriptors#getKeyDeserializerProvider()} is the 
same as {@link
+ *   KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   {@link ReadSourceDescriptors#getValueDeserializerProvider()} is the 
same as {@link
+ *   KafkaIO.Read#getValueDeserializerProvider()}.
+ *   {@link ReadSourceDescriptors#isCommitOffsetEnabled()} has the same 
meaning as {@link
+ *   KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * 
+ *
+ * For example, to create a basic {@link ReadSourceDescriptors} transform:
+ *
+ * {@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *  .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class));
+ * }
+ *
+ * Note that the {@code bootstrapServers} can also be populated from the {@link
+ * KafkaSourceDescriptor}:
+ *
+ * {@code
+ * pipeline
+ *  .apply(Create.of(
+ *KafkaSourceDescriptor.of(
+ *  new TopicPartition("topic", 1),
+ *  null,
+ *  null,
+ *  ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ *  .apply(KafkaIO.readAll()
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class));
+ * }
+ *
+ * Configurations of {@link ReadSourceDescriptors}
+ *
+ * Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * {@link ReadSourceDescriptors#commitOffsets()} enables committing offset 
after processing the
+ * record. Note that if the {@code isolation.level} is set to "read_committed" 
or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadSourceDescriptors#commitOffsets()} will be ignored.
+ *
+ * {@link 
ReadSourceDescriptors#withExtractOutputTimestampFn(SerializableFunction)} is 
used to
+ * compute the {@code output timestamp} for a given {@link KafkaRecord}. There 
are three built-in
+ * types: {@link ReadSourceDescriptors#withProcessingTime()}, {@link
+ * ReadSourceDescriptors#withCreateTime()} and {@link 
ReadSourceDescriptors#withLogAppendTime()}.

Review comment:
   use an unorderered list for the options: ``

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +209,102 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a 
PCollection of {@link

Review comment:
   We should state the order of preference for how we start reading 
(offset/timestamp/last commit offset) in this block somewhere.

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * 

[GitHub] [beam] codeBehindMe commented on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns

2020-08-05 Thread GitBox


codeBehindMe commented on pull request #12170:
URL: https://github.com/apache/beam/pull/12170#issuecomment-669553446


   thank @lostluck , I will implement this today / tomorrow. 
   Apologies, personal commitments have been getting in the way recently.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] codeBehindMe edited a comment on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns

2020-08-05 Thread GitBox


codeBehindMe edited a comment on pull request #12170:
URL: https://github.com/apache/beam/pull/12170#issuecomment-669553446


   thanks @lostluck , I will implement this today / tomorrow. 
   Apologies, personal commitments have been getting in the way recently.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on a change in pull request #12475: Update nexmark dashboard links.

2020-08-05 Thread GitBox


apilloud commented on a change in pull request #12475:
URL: https://github.com/apache/beam/pull/12475#discussion_r466019162



##
File path: 
website/www/site/content/en/documentation/sdks/java/testing/nexmark.md
##
@@ -632,30 +632,10 @@ There are dashboards for these runners (others to come):
 - spark
 - flink
 - direct runner
+- Dataflow
 
 Each dashboard contains:
 - graphs in batch mode
 - graphs in streaming mode
 - graphs for all the queries.
 
-### Performance dashboards links
-
-[Nexmark performance direct 
runner](https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424)

Review comment:
   The equivalent dashboards don't exist yet. I filed 
https://issues.apache.org/jira/browse/BEAM-10643





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable

2020-08-05 Thread GitBox


tvalentyn commented on a change in pull request #12239:
URL: https://github.com/apache/beam/pull/12239#discussion_r466016946



##
File path: sdks/python/test-suites/dataflow/build.gradle
##
@@ -0,0 +1,50 @@
+/*

Review comment:
   It seems that we need to make changes to the top-level suite 
definitions, e.g. : 
https://github.com/apache/beam/blob/ba380270dbc8854ad44f6636a6767317a0416a99/build.gradle#L216
   to point to /test-suites//build.gradle files.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on a change in pull request #12475: Update nexmark dashboard links.

2020-08-05 Thread GitBox


aaltay commented on a change in pull request #12475:
URL: https://github.com/apache/beam/pull/12475#discussion_r466015852



##
File path: 
website/www/site/content/en/documentation/sdks/java/testing/nexmark.md
##
@@ -632,30 +632,10 @@ There are dashboards for these runners (others to come):
 - spark
 - flink
 - direct runner
+- Dataflow
 
 Each dashboard contains:
 - graphs in batch mode
 - graphs in streaming mode
 - graphs for all the queries.
 
-### Performance dashboards links
-
-[Nexmark performance direct 
runner](https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424)

Review comment:
   Can we deep link similarly in the new dashboards?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable

2020-08-05 Thread GitBox


tvalentyn commented on a change in pull request #12239:
URL: https://github.com/apache/beam/pull/12239#discussion_r466015279



##
File path: sdks/python/test-suites/dataflow/build.gradle
##
@@ -0,0 +1,50 @@
+/*

Review comment:
   Nevermind, I see that they both serve a different purpose.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tvalentyn commented on a change in pull request #12239: [BEAM-9980] tests tied with Python versions configurable

2020-08-05 Thread GitBox


tvalentyn commented on a change in pull request #12239:
URL: https://github.com/apache/beam/pull/12239#discussion_r466012301



##
File path: sdks/python/test-suites/dataflow/build.gradle
##
@@ -0,0 +1,50 @@
+/*

Review comment:
   why do we need build.gradle in addition to common.grade?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh opened a new pull request #12475: Update nexmark dashboard links.

2020-08-05 Thread GitBox


tysonjh opened a new pull request #12475:
URL: https://github.com/apache/beam/pull/12475


   The App Engine app that hosted the old perf benchmarks was turned down. The 
new dashboards are available at http://metrics.beam.apache.org.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.or

[GitHub] [beam] DanKotowski commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


DanKotowski commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r465976291



##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+  dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+  HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+  api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+
+# create a temp Dicom store based on the time stamp
+self.temp_dicom_store = "DICOM_store_" + str(time.time())

Review comment:
   If tests are run in parallel potentially the 2 stores could have the 
same name because you are using time. It may be best to generate a random 
string.

##
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_be

[GitHub] [beam] reuvenlax opened a new pull request #12474: OrderedListState API

2020-08-05 Thread GitBox


reuvenlax opened a new pull request #12474:
URL: https://github.com/apache/beam/pull/12474


   This PR adds the API and and in-memory implementation for the 
timestamp-ordered list state. The API is currently marked experimental and is 
still subject to change.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli closed pull request #12046: [BEAM-10290] Add a unit(ish) test for dynamic splitting correctness.

2020-08-05 Thread GitBox


youngoli closed pull request #12046:
URL: https://github.com/apache/beam/pull/12046


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on pull request #12433: [BEAM-10613] Implement BIT_AND for Beam SQL ZetaSQL dialect as CombineFn

2020-08-05 Thread GitBox


amaliujia commented on pull request #12433:
URL: https://github.com/apache/beam/pull/12433#issuecomment-669466436


   My impression was bit_and crashed due to this special while others didn't 
(or not tested). That was why bit_and was disabled.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia edited a comment on pull request #12433: [BEAM-10613] Implement BIT_AND for Beam SQL ZetaSQL dialect as CombineFn

2020-08-05 Thread GitBox


amaliujia edited a comment on pull request #12433:
URL: https://github.com/apache/beam/pull/12433#issuecomment-669466436


   My impression was bit_and crashed due to this special case while others 
didn't (or not tested). That was why bit_and was disabled.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #12046: [BEAM-10290] Add a unit(ish) test for dynamic splitting correctness.

2020-08-05 Thread GitBox


lostluck commented on pull request #12046:
URL: https://github.com/apache/beam/pull/12046#issuecomment-669466490


   Since the other PR was merged in, this can be closed, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #12170: [BEAM-10166] Improve execution time errors with more concise error messages from DoFns

2020-08-05 Thread GitBox


lostluck commented on pull request #12170:
URL: https://github.com/apache/beam/pull/12170#issuecomment-669466247


   This looks good to me! Thank you for the contribution!
   
   Would you mind adding a test to validate this behaviour?
   exec/util_test.go doesn't currently exist, so it would need to be added.
   
   There's no need to test everything in exec/util.go, just callNoPanic.  
   
   You can test three cases: 
   *  one where the passed in func(context.Context) error doesn't panic, and 
the error is just returned as is (they should compare as equal for a simple 
error)
* another test where the the passed in function panics, which should return 
an error that contains "panic: " in it. 
 * finally, one where the error passed to panic is wrapped in your new 
doFnError error type.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on a change in pull request #12455: [BEAM-10630] Include data from load tests in the release process

2020-08-05 Thread GitBox


mxm commented on a change in pull request #12455:
URL: https://github.com/apache/beam/pull/12455#discussion_r465968286



##
File path: website/www/site/content/en/contribute/release-guide.md
##
@@ -244,7 +247,21 @@ __Attention__: Only PMC has permission to perform this. If 
you are not a PMC, pl
 **
 
 
-## 2. Create a release branch in apache/beam repository
+## 3. Investigate performance regressions
+
+Check the Beam load tests for possible performance regressions. Measurements 
are available on [metrics.beam.apache.org](http://metrics.beam.apache.org).
+
+All Runners which publish data should be checked for the following, in both 
*batch* and *streaming* mode:
+
+- [ParDo](http://metrics.beam.apache.org/d/MOi-kf3Zk/pardo-load-tests) and 
[GBK](http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-test): Runtime, 
latency, checkpoint duration
+- [Nexmark](http://metrics.beam.apache.org/d/ahuaA_zGz/nexmark): Query runtime 
for all queries
+- [IO](http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow): 
Runtime
+
+If regressions are found, the release branch can still be created, but the 
regressions should be investigated and fixed as part of the release process.
+JIRA issues should be created for each regression with the 'Fix Version' set 
to the to-be-released version.
+Next, the mailing list should be informed to allow fixing the regressions in 
the course of the release.

Review comment:
   I've pushed two more commits to address these two points.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on a change in pull request #12455: [BEAM-10630] Include data from load tests in the release process

2020-08-05 Thread GitBox


mxm commented on a change in pull request #12455:
URL: https://github.com/apache/beam/pull/12455#discussion_r465959895



##
File path: website/www/site/content/en/contribute/release-guide.md
##
@@ -244,7 +247,21 @@ __Attention__: Only PMC has permission to perform this. If 
you are not a PMC, pl
 **
 
 
-## 2. Create a release branch in apache/beam repository
+## 3. Investigate performance regressions
+
+Check the Beam load tests for possible performance regressions. Measurements 
are available on [metrics.beam.apache.org](http://metrics.beam.apache.org).
+
+All Runners which publish data should be checked for the following, in both 
*batch* and *streaming* mode:
+
+- [ParDo](http://metrics.beam.apache.org/d/MOi-kf3Zk/pardo-load-tests) and 
[GBK](http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-test): Runtime, 
latency, checkpoint duration
+- [Nexmark](http://metrics.beam.apache.org/d/ahuaA_zGz/nexmark): Query runtime 
for all queries
+- [IO](http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow): 
Runtime
+
+If regressions are found, the release branch can still be created, but the 
regressions should be investigated and fixed as part of the release process.
+JIRA issues should be created for each regression with the 'Fix Version' set 
to the to-be-released version.

Review comment:
   I'd like to try filing regression JIRA issues for all listed benchmarks. 
Hopefully, that should not be too many. Like you said, the issues will be 
triaged. If there is nobody taking care of an issue, then ultimately we can 
think about removing the associated benchmark.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] George-Wu opened a new pull request #12473: [BEAM-10642] DICOM API Beam IO connector e2e test

2020-08-05 Thread GitBox


George-Wu opened a new pull request #12473:
URL: https://github.com/apache/beam/pull/12473


   Add integration test for DICOM Beam IO connector
   
   
   R:@pabloem,
   CC:@dranderson1117,@DanKotowski
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apach

[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465934906



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string

Review comment:
   Awesome suggestion. Will work on it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pskevin commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465934590



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
   The current version was never meant to be representative of what the 
final API would be. Only after discussing code organization was the API surface 
meant to be refined. 
   
   Thanks for the new type idea! It'll make separation of concerns much clearer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb merged pull request #12459: [BEAM-9547] Simplify pandas implementation.

2020-08-05 Thread GitBox


robertwb merged pull request #12459:
URL: https://github.com/apache/beam/pull/12459


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-669367028


   Removed sibling deduplication. @robertwb and @aaltay could you take another 
look?
   
   Also, filed [BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641) 
for adding this to Dataflow.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #12472: [BEAM-7390] Add sum code snippets

2020-08-05 Thread GitBox


aaltay commented on pull request #12472:
URL: https://github.com/apache/beam/pull/12472#issuecomment-669363089


   LGTM. Please ping me after @rosetn reviews.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aaltay commented on pull request #12453: [BEAM-9421] Add Java snippets to NLP documentation.

2020-08-05 Thread GitBox


aaltay commented on pull request #12453:
URL: https://github.com/apache/beam/pull/12453#issuecomment-669359409


   > @aaltay I think testing this would increase test time and introduce risk 
of flakes - the cloud language analysis results aren't very reliable (since the 
ML model underneath can change) - the most we could test would be if the 
pipeline is correct, running and returns anything.
   
   Snippet tests are not meant to be integration tests, but more like a way to 
check that snippets are still executable over time.  Anything we can test 
without actually making a round trip to the service? If that is not possible, 
let's skip tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465917488



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+  'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+"""Returns a key that will be identical for common siblings."""
+transform_output_keys = list(transform.outputs.keys())
+# Return None as the sibling key for ineligible transforms.
+if len(transform_output_keys
+  ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+  return None
+return SiblingKey(
+spec_urn=transform.spec.urn,
+spec_payload=transform.spec.payload,
+inputs=tuple(transform.inputs.items()),
+environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+transform = only_transform(stage.transforms)
+stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+if sibling_key is None or len(sibling_stages) == 1:
+  continue
+output_pcoll_ids = [
+only_element(stage.transforms[0].outputs.values())
+for stage in sibling_stages
+]
+to_delete_pcoll_ids = output_pcoll_ids[1:]
+for to_delete_pcoll_id in to_delete_pcoll_ids:
+  pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+  del context.components.pcollections[to_delete_pcoll_id]
+del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+for stage in sibling_stages:
+  input_keys_to_remap = []
+  for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+if input_pcoll_id in pcoll_id_remap:
+  input_keys_to_remap.append(input_key)
+  for input_key_to_remap in input_keys_to_remap:
+stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+stage.transforms[0].inputs[input_key_to_remap]]
+  yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+"""A DoFn that unpacks a packed to multiple tagged outputs.
+
+Example:
+  tags = (T1, T2, ...)
+  input = (K, (V1, V2, ...))
+  output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+"""
+
+def __init__(self, tags):
+  self._tags = tags
+
+def process(self, element):
+  key, values = element
+  return [
+  core.pvalue.TaggedOutput(tag, (key, value))
+  for tag, value in zip(self._tags, values)
+  ]
+
+  def _get_fallback_coder_id():
+return context.add_or_get_coder_id(
+coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+assert index < 2
+if coder.spec.urn == common_urns.coders.KV.urn and len(
+coder.component_coder_ids) == 2:
+  return coder.component_coder_ids[index]
+return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+if a.can_fuse(b, context):
+  return a.fuse(b)
+else:
+  raise ValueError
+
+  def _try_merge_environments(env1, env2):
+if env1 is None:
+  return env2
+elif env2 is None:
+  return env1
+else:
+  if env1 != env2:
+

[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465915603



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+  'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+"""Returns a key that will be identical for common siblings."""
+transform_output_keys = list(transform.outputs.keys())
+# Return None as the sibling key for ineligible transforms.
+if len(transform_output_keys
+  ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+  return None
+return SiblingKey(
+spec_urn=transform.spec.urn,
+spec_payload=transform.spec.payload,
+inputs=tuple(transform.inputs.items()),
+environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+transform = only_transform(stage.transforms)
+stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+if sibling_key is None or len(sibling_stages) == 1:
+  continue
+output_pcoll_ids = [
+only_element(stage.transforms[0].outputs.values())
+for stage in sibling_stages
+]
+to_delete_pcoll_ids = output_pcoll_ids[1:]
+for to_delete_pcoll_id in to_delete_pcoll_ids:
+  pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+  del context.components.pcollections[to_delete_pcoll_id]
+del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+for stage in sibling_stages:
+  input_keys_to_remap = []
+  for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+if input_pcoll_id in pcoll_id_remap:
+  input_keys_to_remap.append(input_key)
+  for input_key_to_remap in input_keys_to_remap:
+stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+stage.transforms[0].inputs[input_key_to_remap]]
+  yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+"""A DoFn that unpacks a packed to multiple tagged outputs.
+
+Example:
+  tags = (T1, T2, ...)
+  input = (K, (V1, V2, ...))
+  output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+"""
+
+def __init__(self, tags):
+  self._tags = tags
+
+def process(self, element):
+  key, values = element
+  return [
+  core.pvalue.TaggedOutput(tag, (key, value))
+  for tag, value in zip(self._tags, values)
+  ]
+
+  def _get_fallback_coder_id():
+return context.add_or_get_coder_id(
+coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+assert index < 2
+if coder.spec.urn == common_urns.coders.KV.urn and len(
+coder.component_coder_ids) == 2:
+  return coder.component_coder_ids[index]
+return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+if a.can_fuse(b, context):
+  return a.fuse(b)
+else:
+  raise ValueError
+
+  def _try_merge_environments(env1, env2):

Review comment:
   Yup, replaced with `Stage._merge_environments` instead.





[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r457799457



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##
@@ -289,6 +289,8 @@ def create_stages(
 phases=[
 translations.annotate_downstream_side_inputs,
 translations.fix_side_input_pcoll_coders,
+translations.eliminate_common_siblings,

Review comment:
   As we discussed in a separate conversation, we would probably need a 
design or RFC for annotating DoFns are idempotent. As such, I've removed 
`eliminate_common_siblings` from this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465914422



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+  'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+"""Returns a key that will be identical for common siblings."""
+transform_output_keys = list(transform.outputs.keys())
+# Return None as the sibling key for ineligible transforms.
+if len(transform_output_keys
+  ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+  return None
+return SiblingKey(
+spec_urn=transform.spec.urn,
+spec_payload=transform.spec.payload,
+inputs=tuple(transform.inputs.items()),
+environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+transform = only_transform(stage.transforms)
+stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}

Review comment:
   Noted. I'm not currently aware of any requirement to keeping this around 
beyond inside this optimizer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465913553



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+  'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+"""Returns a key that will be identical for common siblings."""
+transform_output_keys = list(transform.outputs.keys())
+# Return None as the sibling key for ineligible transforms.
+if len(transform_output_keys
+  ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+  return None
+return SiblingKey(
+spec_urn=transform.spec.urn,
+spec_payload=transform.spec.payload,
+inputs=tuple(transform.inputs.items()),
+environment_id=transform.environment_id)

Review comment:
   Noted. Deleted code due to removal of `eliminate_common_siblings`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

2020-08-05 Thread GitBox


yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465912431



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+  'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+"""Returns a key that will be identical for common siblings."""
+transform_output_keys = list(transform.outputs.keys())
+# Return None as the sibling key for ineligible transforms.
+if len(transform_output_keys
+  ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+  return None
+return SiblingKey(
+spec_urn=transform.spec.urn,
+spec_payload=transform.spec.payload,
+inputs=tuple(transform.inputs.items()),
+environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+transform = only_transform(stage.transforms)
+stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+if sibling_key is None or len(sibling_stages) == 1:
+  continue
+output_pcoll_ids = [
+only_element(stage.transforms[0].outputs.values())
+for stage in sibling_stages
+]
+to_delete_pcoll_ids = output_pcoll_ids[1:]
+for to_delete_pcoll_id in to_delete_pcoll_ids:
+  pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+  del context.components.pcollections[to_delete_pcoll_id]
+del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+for stage in sibling_stages:
+  input_keys_to_remap = []
+  for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+if input_pcoll_id in pcoll_id_remap:
+  input_keys_to_remap.append(input_key)
+  for input_key_to_remap in input_keys_to_remap:
+stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+stage.transforms[0].inputs[input_key_to_remap]]
+  yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+"""A DoFn that unpacks a packed to multiple tagged outputs.
+
+Example:
+  tags = (T1, T2, ...)
+  input = (K, (V1, V2, ...))
+  output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+"""
+
+def __init__(self, tags):
+  self._tags = tags
+
+def process(self, element):
+  key, values = element
+  return [
+  core.pvalue.TaggedOutput(tag, (key, value))
+  for tag, value in zip(self._tags, values)
+  ]
+
+  def _get_fallback_coder_id():
+return context.add_or_get_coder_id(
+coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+assert index < 2
+if coder.spec.urn == common_urns.coders.KV.urn and len(
+coder.component_coder_ids) == 2:
+  return coder.component_coder_ids[index]
+return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+if a.can_fuse(b, context):
+  return a.fuse(b)
+else:
+  raise ValueError
+
+  def _try_merge_environments(env1, env2):
+if env1 is None:
+  return env2
+elif env2 is None:
+  return env1
+else:
+  if env1 != env2:
+

[GitHub] [beam] davidcavazos commented on pull request #12118: [BEAM-7705] Add BigQuery Java samples

2020-08-05 Thread GitBox


davidcavazos commented on pull request #12118:
URL: https://github.com/apache/beam/pull/12118#issuecomment-669345025


   Hi @kennknowles, tests are passing now, can you please give it a check 
whenever you have a chance? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465902065



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Originally the idea was that we didn't want the SDK to have to perform 
these calculations and it is why each operator was going to report 
work_completed/work_remaining if it had them but it seems like accurate 
splitting by fraction needs to take it into account.
   
   Using the graph to compute the progress shouldn't be any more/less difficult 
then the work that is being put into the SDK.
   
   Is there still value in reporting the work_completed/work_remaining metrics 
then?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465879720



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
   Per the other thread, please move these to parameters on CrossLanguage 
and TryCrossLanguage instead. Do not try to force in compatibility with the 
legacy External, it's OK for them to have two separate calls and paths.
   By having them as a struct it's not clear what is required and what is not, 
and the compiler won't help the user by failing at compile time. 
   
   An aside: The other issue here is you've mixed up user side parameters with 
internal implementation details, and made them part of the API surface. APIs 
are easiest to use when the user knows how to fill everything and what is 
required or not. The components and Expanded transform and requirements fields 
are not something that users would be filling in for example. Types are cheap. 
Make a new type instead of trying to reuse something that almost fits.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set

Review comment:
   Note, by this comment, the intent was for you to remove this dead code, 
as it's unnecessary.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,144 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string

Review comment:
   Move these to a graph.CrossLanguage struct, but have their proto types 
be interface{} instead, with a comment about what the types should be. Given 
those fields are only used by beam framework internals, there's little risk in 
using type assertions for them in the right places, such as the graphx package.

##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+

[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph
+   */
+   // Validating scope and inputs
+   if !s.IsValid() {
+   // return nil, errors.New("invalid scope")
+   fmt.Println("invalid scope")
+   }
+   for i, col := range e.In {
+   if !col.IsValid() {
+   // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+   fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+   }
+   }
+
+   // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+   payload := &graph.Payload{
+   URN:  e.Urn,
+   Data: e.Payload,
+   }
+   var ins []*graph.Node
+   for _, col := range e.In {
+   ins = append(ins, col.n)
+   }
+   edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+   // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+   // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+   if p.ExpandedTransforms == nil {
+   p.ExpandedTransforms = make(map[string]*ExternalTransform)
+   }
+   p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
   As discussed, this data can be part of the graph.External node (or a new 
graph.CrossLanguage struct if desired) which keeps it as part of the graph and 
can be handled appropriately in graphx/translate.go. There's absolutely no need 
to add a new way to pass information in through the pipeline OR the suggestion 
you have for scope.
   
   Use the existing abstraction. If it's not sufficient, please articulate why.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on a change in pull request #12445: [BEAM-9919] Added an External Transform API to Go SDK

2020-08-05 Thread GitBox


lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180



##
File path: sdks/go/pkg/beam/external.go
##
@@ -16,10 +16,151 @@
 package beam
 
 import (
+   "context"
+   "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+   jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+   pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+   "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+   idint
+   Urn   string
+   Payload   []byte
+   In[]PCollection
+   Out   []FullType
+   Bounded   bool
+   ExpansionAddr string
+   Components*pipepb.Components
+   ExpandedTransform *pipepb.PTransform
+   Requirements  []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+   if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+   // return Legacy External API
+   }
+
+   /*
+   Add ExternalTranform to the Graph
+   */
+   // Validating scope and inputs
+   if !s.IsValid() {
+   // return nil, errors.New("invalid scope")
+   fmt.Println("invalid scope")
+   }
+   for i, col := range e.In {
+   if !col.IsValid() {
+   // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+   fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+   }
+   }
+
+   // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+   payload := &graph.Payload{
+   URN:  e.Urn,
+   Data: e.Payload,
+   }
+   var ins []*graph.Node
+   for _, col := range e.In {
+   ins = append(ins, col.n)
+   }
+   edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+   // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+   // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+   if p.ExpandedTransforms == nil {
+   p.ExpandedTransforms = make(map[string]*ExternalTransform)
+   }
+   p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
   As discussed, this data can be part of the graph.External node (or a new 
graph.CrossLanguage struct if desired) which keeps it as part of the graph and 
can be handled appropriately in graphx/translate.go. There's absolutely no need 
to add a new way to pass information in through the pipeline.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-05 Thread GitBox


boyuanzz commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465891961



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Thanks for the formula. If we let the runner to do the computation, the 
runner needs to lookup the graph to figure out which progress is from 
downstream. I'm thinking about another option: we can add a progress signal for 
split specifically, which should be reported by the root node of the SDK graph. 
All computations you have mentioned can be done in the SDK side recursively. 
The runner side can decide whether to look into this signal by knowing whether 
there is an SDF(or SDFs) in the SDK graph.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12408: [BEAM-10602] Display Python streaming metrics in Grafana dashboard

2020-08-05 Thread GitBox


tysonjh commented on pull request #12408:
URL: https://github.com/apache/beam/pull/12408#issuecomment-669322906


   I noticed there aren't any results displaying for any other ParDo load tests 
now. Could this change have broken those?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] davidcavazos opened a new pull request #12472: [BEAM-7390] Add sum code snippets

2020-08-05 Thread GitBox


davidcavazos opened a new pull request #12472:
URL: https://github.com/apache/beam/pull/12472


   Add `Sum` code snippets.
   
   R: @aaltay
   R: @rosetn
   
   Staged: 
http://apache-beam-website-pull-requests.storage.googleapis.com/12462/documentation/transforms/python/aggregation/sum/index.html
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_

[GitHub] [beam] lukecwik commented on a change in pull request #12367: [BEAM-10564] Support more Avro field name formats when mapping to Jav…

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #12367:
URL: https://github.com/apache/beam/pull/12367#discussion_r465884172



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##
@@ -601,12 +601,26 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
 private Map getMapping(Schema schema) {
   Map mapping = Maps.newHashMap();
   for (Field field : schema.getFields()) {
-String underscore = 
CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName());
-mapping.put(underscore, field.getName());
+String fieldName = field.getName();
+String getter;
+if (fieldName.contains("_")) {
+  if (Character.isLowerCase(fieldName.charAt(0))) {
+// field_name -> fieldName
+getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
fieldName);
+  } else {
+// FIELD_NAME -> fIELDNAME
+getter = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, 
fieldName.replace("_", ""));

Review comment:
   ```suggestion
   getter = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
fieldName.replace("_", ""));
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc commented on pull request #12435: [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow

2020-08-05 Thread GitBox


kkucharc commented on pull request #12435:
URL: https://github.com/apache/beam/pull/12435#issuecomment-669316184


   r: @tysonjh would you find time to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck commented on pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.

2020-08-05 Thread GitBox


lostluck commented on pull request #12471:
URL: https://github.com/apache/beam/pull/12471#issuecomment-669307726


   R: @youngoli 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck opened a new pull request #12471: [BEAM-9615] Add initial Schema to Go conversions.

2020-08-05 Thread GitBox


lostluck opened a new pull request #12471:
URL: https://github.com/apache/beam/pull/12471


   Adds initial Schema to Go type conversions and vice versa.
   
   In particular, handles the "easy" stuff: basic struct to Row conversion, 
Slices, and maps, basic field name conversions.
   
   Doesn't implement struct+schema compatibility comparisons, which are 
necessary at pipeline construction time, however, this is more than sufficient 
to allow other immediately incoming code to unlock the row coder tests in 
standard_coders.yaml.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] apilloud merged pull request #12416: Update google-api-services versions.

2020-08-05 Thread GitBox


apilloud merged pull request #12416:
URL: https://github.com/apache/beam/pull/12416


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh edited a comment on pull request #12416: Update google-api-services versions.

2020-08-05 Thread GitBox


tysonjh edited a comment on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-669303736


   > > Failed to publish publication 'mavenJava' to repository 'mavenLocal'
   > > > Invalid publication 'mavenJava': supplied version does not match POM 
file (cannot edit version directly in the POM file).
   > > 
   > > 
   > > Not sure what this means, will need to investigate unless you know.
   > 
   > May just need to run a ./gradlew clean 
(https://issues.apache.org/jira/browse/BEAM-9437). Trying that.
   
   What a mess. I spent a good amount of time, ~3hr or so, trying to get the 
beam-linkage-check.sh to work. I ran into numerous issues from a problem with 
the ':sdks:java:bom:publishMavenJavaPublicationToMavenLocal' task, problems 
building flink tests for 1.8 and 1.9, apex runner tasks executing even though 
the runner is gone, major/minor version mismatches in Java classes, unsupported 
errorprone versions. I reached out to a @lukecwik and @apilloud for help as 
well (thank you both).
   
   What worked in the end:
 * ensure using Java 8 (both binary and classpath)
 * a fresh new clone of apache/beam
   
   https://gist.github.com/tysonjh/d8d28689fc18b536b32498cb0fd27382



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12416: Update google-api-services versions.

2020-08-05 Thread GitBox


tysonjh commented on pull request #12416:
URL: https://github.com/apache/beam/pull/12416#issuecomment-669303736


   > > Failed to publish publication 'mavenJava' to repository 'mavenLocal'
   > > > Invalid publication 'mavenJava': supplied version does not match POM 
file (cannot edit version directly in the POM file).
   > > 
   > > 
   > > Not sure what this means, will need to investigate unless you know.
   > 
   > May just need to run a ./gradlew clean 
(https://issues.apache.org/jira/browse/BEAM-9437). Trying that.
   
   What a mess. I spent a good amount of time, ~3hr or so, trying to get the 
beam-linkage-check.sh to work. I ran into numerous issues from a problem with 
the ':sdks:java:bom:publishMavenJavaPublicationToMavenLocal' task, problems 
building flink tests for 1.8 and 1.9, apex runner tasks executing even though 
the runner is gone, major/minor version mismatches in Java classes, unsupported 
errorprone versions. I reached out to a @lukecwik and@apilloud for help as well 
(thank you both).
   
   What worked in the end:
 * ensure using Java 8 (both binary and classpath)
 * a fresh new clone of apache/beam
   
   https://gist.github.com/tysonjh/d8d28689fc18b536b32498cb0fd27382



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465855425



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Yes, truncate exposes some of the issues where a non root SDF makes 
things interesting.
   
   For option2, if we assume that `inprogress` is included in `remaining` then:
   ```
   total = completed + remaining
   fraction_completed = (inprogress / remaining) * 
downstream_fraction_completed + completed / total
   ```
   `downstream_fraction_completed` would be computed recursively and could also 
be computed effectively if there were multiple consumers with 
`(completed_conumers + downstream_fraction_completed) / total_num_consumers`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lostluck merged pull request #12467: [BEAM-7996] Add map & nil encoding to Go SDK.

2020-08-05 Thread GitBox


lostluck merged pull request #12467:
URL: https://github.com/apache/beam/pull/12467


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465840738



##
File path: .github/workflows/python_tests.yml
##
@@ -0,0 +1,196 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Python tests
+
+on:
+  schedule:
+- cron: '10 2 * * *'
+  push:
+branches: ['master', 'release-*']
+tags: 'v*'
+  pull_request:
+branches: ['master', 'release-*']
+tags: 'v*'
+paths: ['sdks/python/**', 'model/**']
+
+
+env:
+  GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET 
}}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/"
+  GCP_BUCKET: ${{ secrets.GCP_BUCKET }}  # beam-wheels-staging
+  GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}  # apache-beam-testing
+  GCP_REGION: ${{ secrets.GCP_REGION}}  # us-central
+
+jobs:
+
+  build_python_sdk_source:
+name: 'Build Python SDK Source'
+if: github.repository_owner == 'apache' && (github.event_name == 'push' || 
github.event_name == 'schedule')

Review comment:
   
   
   > Does github impose any limits on how many workflow could run in parallel?
   
   Limits for concurrent jobs are between 20-180 depends on the plan 
([docs](https://docs.github.com/en/actions/getting-started-with-github-actions/about-github-actions#usage-limits)).
   I heard that apache has Enterprise plan fot gh-actions but I am not sure 
about it. Do you have more information about it @gmcdonald ?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on pull request #12452:
URL: https://github.com/apache/beam/pull/12452#issuecomment-669283521


   @tvalentyn thanks for review.
   
   > * Is there a way how an author or a reviewer can rerun the action to retry 
the test or the only way is to re-push the PR?
   
   There is an event type called `workflow_dispatch` which enables button to 
manually trigger workflow I just added it.
   
https://github.blog/changelog/2020-07-06-github-actions-manual-triggers-with-workflow_dispatch/
   
   > * When we run unit tests, we create xml files with test results (see 
`generated xml file: D:\a\beam\beam\sdks\python\pytest_py36-win.xml ` entries 
in console logs). It would be great if we could somehow visualize that to see 
failing tests/messages easily without having to dig through console logs. Are 
there any options to do that? This could arrive in future PRs.
   
   I am not sure about visualisation, I have to investigate it more depth, 
however I add uploads of xml files as artifacts.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aromanenko-dev commented on a change in pull request #12422: [BEAM-601] Run KinesisIOIT with localstack

2020-08-05 Thread GitBox


aromanenko-dev commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r465702081



##
File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java
##
@@ -39,16 +40,27 @@
   private final String secretKey;
   private final Regions region;
   private final @Nullable String serviceEndpoint;
+  private final boolean verifyCertificate;
 
   BasicKinesisProvider(
-  String accessKey, String secretKey, Regions region, @Nullable String 
serviceEndpoint) {
+  String accessKey,
+  String secretKey,
+  Regions region,
+  @Nullable String serviceEndpoint,
+  boolean verifyCertificate) {
 checkArgument(accessKey != null, "accessKey can not be null");
 checkArgument(secretKey != null, "secretKey can not be null");
 checkArgument(region != null, "region can not be null");
 this.accessKey = accessKey;
 this.secretKey = secretKey;
 this.region = region;
 this.serviceEndpoint = serviceEndpoint;
+this.verifyCertificate = verifyCertificate;
+  }
+
+  BasicKinesisProvider(

Review comment:
   Why you decided to modify `BasicKinesisProvider` and not just create a 
new provider class for testing that extends this basic one?

##
File path: 
sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##
@@ -95,32 +123,114 @@ private void runRead() {
 PCollection output =
 pipelineRead.apply(
 KinesisIO.read()
-.withStreamName(options.getAwsKinesisStream())
+.withStreamName(streamName)
 .withAWSClientsProvider(
 options.getAwsAccessKey(),
 options.getAwsSecretKey(),
-Regions.fromName(options.getAwsKinesisRegion()))
-.withMaxNumRecords(numberOfRows)
+Regions.fromName(options.getAwsKinesisRegion()),
+options.getAwsServiceEndpoint(),
+options.getAwsVerifyCertificate())
+.withMaxNumRecords(options.getNumberOfRecords())
 // to prevent endless running in case of error
-.withMaxReadTime(Duration.standardMinutes(10))
-
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-.withInitialTimestampInStream(now)
+.withMaxReadTime(Duration.standardMinutes(10L))
+
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
 .withRequestRecordsLimit(1000));
 
 PAssert.thatSingleton(output.apply("Count All", Count.globally()))
-.isEqualTo((long) numberOfRows);
+.isEqualTo((long) options.getNumberOfRecords());
 
 PCollection consolidatedHashcode =
 output
 .apply(ParDo.of(new ExtractDataValues()))
 .apply("Hash row contents", Combine.globally(new 
HashingFn()).withoutDefaults());
 
 PAssert.that(consolidatedHashcode)
-.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
 
 pipelineRead.run().waitUntilFinish();
   }
 
+  /** Necessary setup for localstack environment. */
+  private static void setupLocalstack() {
+
System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
 "true");
+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
"true");
+
+localstackContainer =
+new LocalStackContainer(LOCALSTACK_VERSION)
+.withServices(LocalStackContainer.Service.KINESIS)
+.withEnv("USE_SSL", "true")
+.withStartupAttempts(3);
+localstackContainer.start();
+
+options.setAwsServiceEndpoint(
+localstackContainer
+.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+.getServiceEndpoint()
+.replace("http", "https"));
+options.setAwsKinesisRegion(
+localstackContainer
+.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+.getSigningRegion());
+options.setAwsAccessKey(
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+options.setAwsSecretKey(
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+options.setNumberOfRecords(1000);
+options.setNumberOfShards(1);
+options.setAwsKinesisStream("beam_kinesis_test");
+options.setAwsVerifyCertificate(false);
+  }
+
+  private static AmazonKinesis createKinesisClient() {
+AmazonKinesisClientBuilder clientBuilder = 
AmazonKinesisClientBuilder.standard();
+
+AWSCredentialsProvider credentialsProvider =
+new AWSStaticCredentialsProvider(
+new BasicAWSCredentials(options.getAwsAccessKey(), 
options.getAwsSecretKey()));
+clientBuil

[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow

2020-08-05 Thread GitBox


kkucharc commented on pull request #12435:
URL: https://github.com/apache/beam/pull/12435#issuecomment-669252849


   Run Python Load Tests ParDo Flink Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow

2020-08-05 Thread GitBox


kkucharc commented on pull request #12435:
URL: https://github.com/apache/beam/pull/12435#issuecomment-669252738


   Run Python Load Tests ParDo Dataflow Batch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc merged pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK

2020-08-05 Thread GitBox


kkucharc merged pull request #12151:
URL: https://github.com/apache/beam/pull/12151


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc commented on pull request #12151: [BEAM-9896] Add streaming for SnowflakeIO.Write to Java SDK

2020-08-05 Thread GitBox


kkucharc commented on pull request #12151:
URL: https://github.com/apache/beam/pull/12151#issuecomment-669206144


   I updated CHANGES.md as requested.
   It is approved by @pabloem so I am merging this PR. Thank you a lot Pablo 
for the review! :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow

2020-08-05 Thread GitBox


kkucharc commented on pull request #12435:
URL: https://github.com/apache/beam/pull/12435#issuecomment-669223084


   run seed job



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465753725



##
File path: sdks/python/apache_beam/typehints/typecheck_test_py3.py
##
@@ -92,23 +93,29 @@ def test_wrapper_pass_through(self):
 # We use a file to check the result because the MyDoFn instance passed is
 # not the same one that actually runs in the pipeline (it is serialized
 # here and deserialized in the worker).
-with tempfile.NamedTemporaryFile(mode='w+t') as f:
-  dofn = MyDoFn(f.name)
+
+fd, path = tempfile.mkstemp()
+try:
+  os.close(fd)

Review comment:
   WDYT about adding comment in such places with JIRA ? (as you mentioned 
creating JIRA in 
https://github.com/apache/beam/pull/12452#discussion_r465209691)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on pull request #12452:
URL: https://github.com/apache/beam/pull/12452#issuecomment-669219601


   @tvalentyn answering your question:
   
   > Also, all actions workflow run in parallel, right? so it takes roughly the 
same time as running a regular precommit? In such case we could consider 
running it on PR.
   
   Building sdk source dist is used as prerequisite for 
`python_wordcount_dataflow` job . Building source distribution is already 
validated for PR in `.github/workflows/build_wheels.yml` workflow.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465736828



##
File path: sdks/python/apache_beam/io/parquetio_test.py
##
@@ -296,8 +296,10 @@ def test_sink_transform_int96(self):
   path, self.SCHEMA96, num_shards=1, shard_name_template='')
 
   def test_sink_transform(self):
-with tempfile.NamedTemporaryFile() as dst:
-  path = dst.name
+fd, path = tempfile.mkstemp()

Review comment:
   There was problem on windows since `with tempfile.NamedTemporaryFile() 
as dst` provide `tempfile` which is already opened. On windows any opened file 
seems to be read only which coused PermissionError. I know this workaround is 
ugly but it the best way I found to do it.
   Is separate commit with JIRA will not be lost after merging?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] prodonjs commented on a change in pull request #12460: [BEAM-10545] HtmlView module

2020-08-05 Thread GitBox


prodonjs commented on a change in pull request #12460:
URL: https://github.com/apache/beam/pull/12460#discussion_r465764186



##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx
##
@@ -0,0 +1,119 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+import * as React from 'react';
+
+export interface IHtmlProvider {
+  readonly html: string;
+  readonly script: Array;

Review comment:
   *nit* I think it's generally preferred to use `string[]` for primitive 
array types.

##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx
##
@@ -0,0 +1,119 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+import * as React from 'react';
+
+export interface IHtmlProvider {
+  readonly html: string;
+  readonly script: Array;
+}
+
+interface IHtmlViewProps {
+  htmlProvider: IHtmlProvider;
+}
+
+interface IHtmlViewState {
+  innerHtml: string;
+  script: Array;
+}
+
+/**
+ * A common HTML viewing component that renders given HTML and executes scripts
+ * from the given provider.
+ */
+export class HtmlView extends React.Component {
+  constructor(props: IHtmlViewProps) {
+super(props);
+this.state = {
+  innerHtml: props.htmlProvider.html,
+  script: []
+};
+  }
+
+  componentDidMount(): void {
+this._updateRenderTimerId = setInterval(() => this.updateRender(), 1000);
+  }
+
+  componentWillUnmount(): void {
+clearInterval(this._updateRenderTimerId);
+  }
+
+  updateRender(): void {
+const currentHtml = this.state.innerHtml;
+const htmlToUpdate = this.props.htmlProvider.html;
+const currentScript = this.state.script;
+const scriptToUpdate = [...this.props.htmlProvider.script];
+if (htmlToUpdate !== currentHtml) {
+  this.setState({
+innerHtml: htmlToUpdate,
+// As long as the html is updated, clear the script state.
+script: []
+  });
+}
+/* Depending on whether this iteration updates the html, the scripts
+ * are executed differently.
+ * Html updated: all scripts are new, start execution from index 0;
+ * Html not updated: only newly added scripts need to be executed.
+ */
+const currentScriptLength =
+  htmlToUpdate === currentHtml ? currentScript.length : 0;
+if (scriptToUpdate.length > currentScriptLength) {
+  this.setState(
+{
+  script: scriptToUpdate
+},
+// Executes scripts once the state is updated.
+() => {
+  for (let i = currentScriptLength; i < scriptToUpdate.length; ++i) {
+new Function(scriptToUpdate[i])();
+  }
+}
+  );
+}
+  }
+
+  render(): React.ReactNode {
+return (
+  // This injects raw HTML fetched from kernel into JSX.
+  

Review comment:
   Haha as the property name suggests, this is definitely a risky operation 
from a security perspective since the markup being injected has the ability to 
run scripts in the user's context with access to whatever resources their 
credentials provide.
   
   Is there any alternative that could be considered? Could you create an 
`` element and render the HTML there so that it's sandboxed? I don't 
have any fundamental objections to this as I don't know the full context, bu  I 
know this type of thing has been looked upon with significant concern by our 
internal security reviewers and has implications for us being able to include 
certain types of extensions in our enterprise product.

##
File path: 
sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/common/HtmlView.tsx
##
@@ -0,0 +1,119 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http:/

[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465774681



##
File path: sdks/python/apache_beam/runners/worker/log_handler_test.py
##
@@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries):
 self.assertEqual(
 '%s: %s' % (msg, num_received_log_entries), log_entry.message)
 self.assertTrue(
-re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location),
+re.match(
+r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location),

Review comment:
   I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused:
   ```
  
   raise source.error("bad escape %s" % escape, len(escape))
   sre_constants.error: bad escape \l at position 2
   ```
   https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653
   I reverted change.
   
   Should I use it somehow differently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465774681



##
File path: sdks/python/apache_beam/runners/worker/log_handler_test.py
##
@@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries):
 self.assertEqual(
 '%s: %s' % (msg, num_received_log_entries), log_entry.message)
 self.assertTrue(
-re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location),
+re.match(
+r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location),

Review comment:
   I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused:
   ```
   raise source.error("bad escape %s" % escape, len(escape))
   sre_constants.error: bad escape \l at position 2
   ```
   https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653
   I reverted change.
   
   Should I use it somehow differently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465774681



##
File path: sdks/python/apache_beam/runners/worker/log_handler_test.py
##
@@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries):
 self.assertEqual(
 '%s: %s' % (msg, num_received_log_entries), log_entry.message)
 self.assertTrue(
-re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location),
+re.match(
+r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location),

Review comment:
   I used `rf'.*{os.sep}log_handler_test.py:\d+'` but it caused:
   ```
   2020-08-05T14:25:17.1905260Z >   raise source.error("bad 
escape %s" % escape, len(escape))
   2020-08-05T14:25:17.1905424Z E   sre_constants.error: bad 
escape \l at position 2
   ```
   https://github.com/apache/beam/pull/12452/checks?check_run_id=949595653
   I reverted change.
   
   Should I use it somehow differently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465754369



##
File path: .github/workflows/python_tests.yml
##
@@ -0,0 +1,196 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Python tests
+
+on:
+  schedule:
+- cron: '10 2 * * *'
+  push:
+branches: ['master', 'release-*']
+tags: 'v*'
+  pull_request:
+branches: ['master', 'release-*']
+tags: 'v*'
+paths: ['sdks/python/**', 'model/**']
+
+
+env:
+  GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET 
}}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/"

Review comment:
   It is unnecessary copy-paste leftover. Deleted.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc commented on pull request #12435: [WIP] [BEAM-10616] Added Python Pardo load tests for streaming on Dataflow

2020-08-05 Thread GitBox


kkucharc commented on pull request #12435:
URL: https://github.com/apache/beam/pull/12435#issuecomment-669225991


   Run Python Load Tests ParDo Dataflow Streaming



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465730296



##
File path: .github/workflows/python_tests.yml
##
@@ -0,0 +1,196 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Python tests
+
+on:
+  schedule:
+- cron: '10 2 * * *'
+  push:
+branches: ['master', 'release-*']
+tags: 'v*'
+  pull_request:
+branches: ['master', 'release-*']
+tags: 'v*'
+paths: ['sdks/python/**', 'model/**']
+
+
+env:
+  GCP_WHEELS_STAGING_PATH: "gs://${{ secrets.GCP_BUCKET 
}}/${GITHUB_REF##*/}/${GITHUB_SHA}-${GITHUB_RUN_ID}/"
+  GCP_BUCKET: ${{ secrets.GCP_BUCKET }}  # beam-wheels-staging
+  GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}  # apache-beam-testing
+  GCP_REGION: ${{ secrets.GCP_REGION}}  # us-central
+
+jobs:
+
+  build_python_sdk_source:
+name: 'Build Python SDK Source'
+if: github.repository_owner == 'apache' && (github.event_name == 'push' || 
github.event_name == 'schedule')
+runs-on: ubuntu-latest
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: 3.7
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: pip install -r build-requirements.txt
+  - name: Install wheels
+run: pip install wheel
+  - name: Build source
+working-directory: ./sdks/python
+run: python setup.py sdist --formats=zip
+  - name: Add checksums
+working-directory: ./sdks/python/dist
+run: |
+  file=$(ls | grep .zip | head -n 1)
+  sha512sum $file > ${file}.sha512
+  - name: Unzip source
+working-directory: ./sdks/python
+run: unzip dist/$(ls dist | grep .zip | head -n 1)
+  - name: Rename source directory
+working-directory: ./sdks/python
+run: mv $(ls | grep apache-beam) apache-beam-source
+  - name: Upload compressed sources as artifacts
+uses: actions/upload-artifact@v2
+with:
+  name: source_zip
+  path: sdks/python/dist
+
+  python_unit_tests:
+name: 'Python Unit Tests'
+runs-on: ${{ matrix.os }}
+strategy:
+  fail-fast: false
+  matrix:
+os: [ubuntu-latest, macos-latest, windows-latest]
+params: [
+  {"py_ver": "3.5", "tox_env": "py35"},
+  {"py_ver": "3.6", "tox_env": "py36"},
+  {"py_ver": "3.7", "tox_env": "py37"},
+  {"py_ver": "3.8", "tox_env": "py38"},
+]
+exclude:
+  # TODO remove exclusion after issue with protobuf is solved
+  # https://github.com/protocolbuffers/protobuf/issues/7765
+  - os: windows-latest
+params: {"py_ver": "3.8", "tox_env": "py38"}
+steps:
+  - name: Checkout code
+uses: actions/checkout@v2
+  - name: Install python
+uses: actions/setup-python@v2
+with:
+  python-version: ${{ matrix.params.py_ver }}
+  - name: Get build dependencies
+working-directory: ./sdks/python
+run: pip install -r build-requirements.txt
+  - name: Install tox
+run: pip install tox
+  - name: Run tests basic unix
+if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
+working-directory: ./sdks/python
+run: tox -c tox.ini -e ${{ matrix.params.tox_env }}
+  - name: Run tests basic windows
+if: startsWith(matrix.os, 'windows')
+working-directory: ./sdks/python
+run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-win
+
+  python_wordcount_direct_runner:
+name: 'Python Wordcount Direct Runner'
+runs-on: ${{ matrix.os }}
+strategy:
+  fail-fast: false
+  matrix:
+os: [ubuntu-latest, macos-latest, windows-latest]
+python: [3.5, 3.6, 3.7, 3.8]
+exclude:
+  # TODO remove exclusion after issue with protobuf is solved
+  # https://github.com/protocolbuffers/protobuf/issues/7765
+  - os: windows-latest
+python: 3.8
+steps:

Review comment:
   Right know it is not possible, however GitHub Actions is evolving so 
fast 

[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465753326



##
File path: sdks/python/apache_beam/runners/worker/log_handler_test.py
##
@@ -87,7 +87,8 @@ def _verify_fn_log_handler(self, num_log_entries):
 self.assertEqual(
 '%s: %s' % (msg, num_received_log_entries), log_entry.message)
 self.assertTrue(
-re.match(r'.*/log_handler_test.py:\d+', log_entry.log_location),
+re.match(
+r'.*(/|\\)log_handler_test.py:\d+', log_entry.log_location),

Review comment:
   Added fix. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TobKed commented on a change in pull request #12452: [BEAM-10623] Add workflow to run Beam python tests on Linux/Windows/Mac platforms

2020-08-05 Thread GitBox


TobKed commented on a change in pull request #12452:
URL: https://github.com/apache/beam/pull/12452#discussion_r465753725



##
File path: sdks/python/apache_beam/typehints/typecheck_test_py3.py
##
@@ -92,23 +93,29 @@ def test_wrapper_pass_through(self):
 # We use a file to check the result because the MyDoFn instance passed is
 # not the same one that actually runs in the pipeline (it is serialized
 # here and deserialized in the worker).
-with tempfile.NamedTemporaryFile(mode='w+t') as f:
-  dofn = MyDoFn(f.name)
+
+fd, path = tempfile.mkstemp()
+try:
+  os.close(fd)

Review comment:
   WDYT about adding comment in such places with JIRA ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc merged pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

2020-08-05 Thread GitBox


kkucharc merged pull request #12149:
URL: https://github.com/apache/beam/pull/12149


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damgad commented on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

2020-08-05 Thread GitBox


damgad commented on pull request #12149:
URL: https://github.com/apache/beam/pull/12149#issuecomment-669180916


   @kkucharc Indeed, `Whitespace PreCommit` is not yet merged to master so 
technically shouldn't be here. It appeared because the seed job was run from 
the PR you linked.
   
   There's probably a formatting issue with trailing whitespaces, but you can 
disregard it and merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damgad commented on a change in pull request #12444: Added a whitespace lint as part of python lint precommit

2020-08-05 Thread GitBox


damgad commented on a change in pull request #12444:
URL: https://github.com/apache/beam/pull/12444#discussion_r465674025



##
File path: .test-infra/jenkins/job_PreCommit_Whitespace.groovy
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import PrecommitJobBuilder
+
+PrecommitJobBuilder builder = new PrecommitJobBuilder(
+scope: this,
+nameBase: 'Whitespace',
+gradleTask: ':whitespacePreCommit',
+triggerPathPatterns: [
+  '.*\\.md$',
+  '.*build\\.gradle$',
+]

Review comment:
   Hey, no worries. The regex looks fine, and I've also checked that it 
works. FYI: by running the seed job from the PR you can see your changes in 
action on Jenkins. However (practically) only committers could trigger that.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kkucharc edited a comment on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

2020-08-05 Thread GitBox


kkucharc edited a comment on pull request #12149:
URL: https://github.com/apache/beam/pull/12149#issuecomment-669144965


   I can see that now tests are passing 🎉  I see `LGTM` from @chamikaramj so 
I'm merging :) Thank you a lot @chamikaramj and @purbanow !
   PS: after writing this comment `Run Whitespace PreCommit` appeared, but it 
seems this is something still in progress 
[link](https://github.com/apache/beam/pull/12444) so I believe this check 
appeared here by mistake.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damgad closed pull request #12470: DISREGARD - test PR

2020-08-05 Thread GitBox


damgad closed pull request #12470:
URL: https://github.com/apache/beam/pull/12470


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] damgad opened a new pull request #12470: DISREGARD - test PR

2020-08-05 Thread GitBox


damgad opened a new pull request #12470:
URL: https://github.com/apache/beam/pull/12470


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-

[GitHub] [beam] kkucharc commented on pull request #12149: [BEAM-9897] Add cross-language support to SnowflakeIO.Read

2020-08-05 Thread GitBox


kkucharc commented on pull request #12149:
URL: https://github.com/apache/beam/pull/12149#issuecomment-669144965


   I can see that now tests are passing 🎉  I see `LGTM` from @chamikaramj so 
I'm merging :) Thank you a lot @chamikaramj and @purbanow !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >