(beam) branch master updated (00445adda91 -> 0ca3f19555e)

2024-10-01 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 00445adda91 Support string FQN as a way to add lineage information 
(#32613)
 add a0c9245e752 Fix writing raw messages to pubsub
 add 3d90cffe6b0 Add case for exisiting bytes object
 add 68263fcf507 Require bytes or strings.
 add 76a600d25b8 Fix formatting typo.
 add 79b534c8941 fix typo
 add 0ca3f19555e Merge pull request #32342 Fix writing raw messages to 
pubsub.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/yaml/yaml_io.py | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)



(beam) branch master updated: [yaml] Preserve windowing for windowed input when using FileIO Java providers (#32586)

2024-09-30 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b85f718cb8 [yaml] Preserve windowing for windowed input when using 
FileIO Java providers (#32586)
9b85f718cb8 is described below

commit 9b85f718cb856c03132eaa48845954c8922bab71
Author: Jeff Kinard 
AuthorDate: Mon Sep 30 17:36:11 2024 -0400

[yaml] Preserve windowing for windowed input when using FileIO Java 
providers (#32586)
---
 .../sdk/io/csv/providers/CsvWriteTransformProvider.java | 17 +
 .../io/json/providers/JsonWriteTransformProvider.java   | 13 +++--
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
index f4d54c408cf..89e8211026b 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.commons.csv.CSVFormat;
 
@@ -134,10 +135,18 @@ public class CsvWriteTransformProvider
   if (configuration.getDelimiter() != null) {
 format = format.withDelimiter(configuration.getDelimiter().charAt(0));
   }
-  WriteFilesResult result =
-  input
-  .get(INPUT_ROWS_TAG)
-  .apply(CsvIO.writeRows(configuration.getPath(), 
format).withSuffix(""));
+
+  // Preserve input windowing
+  CsvIO.Write writeTransform =
+  CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
+  if (!input
+  .get(INPUT_ROWS_TAG)
+  .getWindowingStrategy()
+  .equals(WindowingStrategy.globalDefault())) {
+writeTransform = writeTransform.withWindowedWrites();
+  }
+
+  WriteFilesResult result = 
input.get(INPUT_ROWS_TAG).apply(writeTransform);
   Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
   return PCollectionRowTuple.of(
   WRITE_RESULTS,
diff --git 
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
 
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
index 9e030821e5c..a522d176fac 100644
--- 
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
+++ 
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 
 /**
@@ -121,8 +122,16 @@ public class JsonWriteTransformProvider
 
 @Override
 public PCollectionRowTuple expand(PCollectionRowTuple input) {
-  WriteFilesResult result =
-  
input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
+  // Preserve input windowing
+  JsonIO.Write writeTransform = 
JsonIO.writeRows(configuration.getPath()).withSuffix("");
+  if (!input
+  .get(INPUT_ROWS_TAG)
+  .getWindowingStrategy()
+  .equals(WindowingStrategy.globalDefault())) {
+writeTransform = writeTransform.withWindowedWrites();
+  }
+
+  WriteFilesResult result = 
input.get(INPUT_ROWS_TAG).apply(writeTransform);
   Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
   return PCollectionRowTuple.of(
   WRITE_RESULTS,



(beam) 01/01: Merge pull request Better error message for incorrect pipeline options flags.

2024-09-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 475c98c8779b7f332ce10d56df61ba7652aa3ab7
Merge: 7daeadff032 e5f454b04b3
Author: Robert Bradshaw 
AuthorDate: Wed Sep 18 10:08:08 2024 -0700

Merge pull request Better error message for incorrect pipeline options 
flags.

 sdks/python/apache_beam/options/pipeline_options.py | 14 +++---
 1 file changed, 11 insertions(+), 3 deletions(-)




(beam) branch master updated (7daeadff032 -> 475c98c8779)

2024-09-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 7daeadff032 Restore release-essential references to 3.8 (#32499)
 add 2cd6fb8d758 Better error message for incorrect pipeline options flags.
 add e3f8c20eeea More conservative args parsing.
 add e5f454b04b3 Fix mypy strictness.
 new 475c98c8779 Merge pull request Better error message for incorrect 
pipeline options flags.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/options/pipeline_options.py | 14 +++---
 1 file changed, 11 insertions(+), 3 deletions(-)



(beam) branch master updated: Accept runner and options in ib.collect. (#32434)

2024-09-12 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4ee26065d9b Accept runner and options in ib.collect. (#32434)
4ee26065d9b is described below

commit 4ee26065d9b4ea074d3d59d38cc4fbc5aa9dbfa5
Author: Robert Bradshaw 
AuthorDate: Thu Sep 12 11:56:41 2024 -0700

Accept runner and options in ib.collect. (#32434)
---
 .../display/pcoll_visualization_test.py|  2 +-
 .../runners/interactive/interactive_beam.py| 12 -
 .../interactive/non_interactive_runner_test.py | 30 ++
 .../runners/interactive/pipeline_fragment.py   | 12 +
 .../runners/interactive/recording_manager.py   | 17 ++--
 5 files changed, 64 insertions(+), 9 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
 
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
index d34b966b0ef..7fc76feb749 100644
--- 
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
@@ -66,7 +66,7 @@ class PCollectionVisualizationTest(unittest.TestCase):
 ie.current_env().track_user_pipelines()
 
 recording_manager = RecordingManager(self._p)
-recording = recording_manager.record([self._pcoll], 5, 5)
+recording = recording_manager.record([self._pcoll], max_n=5, 
max_duration=5)
 self._stream = recording.stream(self._pcoll)
 
   def test_pcoll_visualization_generate_unique_display_id(self):
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 5c76f9c228c..0e170eb0f50 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -880,6 +880,8 @@ def collect(
 n='inf',
 duration='inf',
 include_window_info=False,
+runner=None,
+options=None,
 force_compute=False,
 force_tuple=False):
   """Materializes the elements from a PCollection into a Dataframe.
@@ -896,6 +898,9 @@ def collect(
 a string duration. Default 'inf'.
 include_window_info: (optional) if True, appends the windowing information
 to each row. Default False.
+runner: (optional) the runner with which to compute the results
+options: (optional) any additional pipeline options to use to compute the
+results
 force_compute: (optional) if True, forces recomputation rather than using
 cached PCollections
 force_tuple: (optional) if True, return a 1-tuple or results rather than
@@ -969,7 +974,12 @@ def collect(
   uncomputed = set(pcolls) - set(computed.keys())
   if uncomputed:
 recording = recording_manager.record(
-uncomputed, max_n=n, max_duration=duration, 
force_compute=force_compute)
+uncomputed,
+max_n=n,
+max_duration=duration,
+runner=runner,
+options=options,
+force_compute=force_compute)
 
 try:
   for pcoll in uncomputed:
diff --git 
a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
index 47adf7b36b3..f7fd052fecc 100644
--- a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
@@ -257,6 +257,36 @@ class NonInteractiveRunnerTest(unittest.TestCase):
 df_expected['cube'],
 ib.collect(df['cube'], n=10).reset_index(drop=True))
 
+  @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+  def test_new_runner_and_options(self):
+class MyRunner(beam.runners.PipelineRunner):
+  run_count = 0
+
+  @classmethod
+  def run_pipeline(cls, pipeline, options):
+assert options._all_options['my_option'] == 123
+cls.run_count += 1
+return direct_runner.DirectRunner().run_pipeline(pipeline, options)
+
+clear_side_effect()
+p = beam.Pipeline(direct_runner.DirectRunner())
+
+# Initial collection runs the pipeline.
+pcoll1 = p | beam.Create(['a', 'b', 'c']) | beam.Map(cause_side_effect)
+collected1 = ib.collect(pcoll1)
+self.assertEqual(set(collected1[0]), set(['a', 'b', 'c']))
+self.assertEqual(count_side_effects('a'), 1)
+
+# Using the PCollection uses the cache with a different runner and options.
+pcoll2 = pcoll1 | beam.Map(str.upper)
+collected2 = ib.collect(
+pcoll2,
+runner=MyRunner(),
+options=beam.opti

(beam) branch master updated (1ee2f6bff4a -> a7852d9ddb1)

2024-09-06 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1ee2f6bff4a Bump jinja2 version to resolve vulnerability (#32403)
 add 50a6cd2b580 Add tests of using ib.collect(...) without 
InteractiveRunner.
 add d5dc2d5 Allow ib.collect(...) to take multiple PCollections.
 new a7852d9ddb1 Merge pull request #32392 Allow ib.collect(...) to take 
multiple PCollections.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runners/interactive/interactive_beam.py|  96 +---
 .../interactive/non_interactive_runner_test.py | 262 +
 2 files changed, 320 insertions(+), 38 deletions(-)
 create mode 100644 
sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py



(beam) branch master updated: Allow ib.collect to work with non-interactive runners. (#32383)

2024-09-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 2b02fd3cc94 Allow ib.collect to work with non-interactive runners. 
(#32383)
2b02fd3cc94 is described below

commit 2b02fd3cc94683be9b58f9049346a984b0d20ff6
Author: Robert Bradshaw 
AuthorDate: Tue Sep 3 15:39:22 2024 -0700

Allow ib.collect to work with non-interactive runners. (#32383)
---
 .../runners/interactive/interactive_beam.py| 16 +--
 .../runners/interactive/pipeline_fragment.py   | 50 +++---
 .../runners/interactive/pipeline_fragment_test.py  |  8 
 .../runners/interactive/recording_manager.py   |  9 +++-
 4 files changed, 56 insertions(+), 27 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index ee1b37cd704..a1cea2637cb 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -875,7 +875,12 @@ def show(
 
 
 @progress_indicated
-def collect(pcoll, n='inf', duration='inf', include_window_info=False):
+def collect(
+pcoll,
+n='inf',
+duration='inf',
+include_window_info=False,
+force_compute=False):
   """Materializes the elements from a PCollection into a Dataframe.
 
   This reads each element from file and reads only the amount that it needs
@@ -889,6 +894,8 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
 a string duration. Default 'inf'.
 include_window_info: (optional) if True, appends the windowing information
 to each row. Default False.
+force_compute: (optional) if True, forces recomputation rather than using
+cached PCollections
 
   For example::
 
@@ -938,7 +945,7 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
   user_pipeline, create_if_absent=True)
 
   # If already computed, directly read the stream and return.
-  if pcoll in ie.current_env().computed_pcollections:
+  if pcoll in ie.current_env().computed_pcollections and not force_compute:
 pcoll_name = find_pcoll_name(pcoll)
 elements = list(
 recording_manager.read(pcoll_name, pcoll, n, duration).read())
@@ -947,7 +954,10 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
 include_window_info=include_window_info,
 element_type=element_type)
 
-  recording = recording_manager.record([pcoll], max_n=n, max_duration=duration)
+  recording = recording_manager.record([pcoll],
+   max_n=n,
+   max_duration=duration,
+   force_compute=force_compute)
 
   try:
 elements = list(recording.stream(pcoll).read())
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py 
b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
index 0f6906841a8..5b385d3f8a0 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
@@ -22,7 +22,9 @@ For internal use only; no backwards-compatibility guarantees.
 import apache_beam as beam
 from apache_beam.pipeline import AppliedPTransform
 from apache_beam.pipeline import PipelineVisitor
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import pipeline_instrument as instr
 from apache_beam.testing.test_stream import TestStream
 
 
@@ -65,7 +67,6 @@ class PipelineFragment(object):
 # into a pipeline fragment that later run by the underlying runner.
 self._runner_pipeline = self._build_runner_pipeline()
 _, self._context = self._runner_pipeline.to_runner_api(return_context=True)
-from apache_beam.runners.interactive import pipeline_instrument as instr
 self._runner_pcoll_to_id = instr.pcoll_to_pcoll_id(
 self._runner_pipeline, self._context)
 # Correlate components in the runner pipeline to components in the user
@@ -104,23 +105,42 @@ class PipelineFragment(object):
 
   def run(self, display_pipeline_graph=False, use_cache=True, blocking=False):
 """Shorthand to run the pipeline fragment."""
+fragment = self.deduce_fragment()
 from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner
-if not isinstance(self._runner_pipeline.runner, InteractiveRunner):
-  raise RuntimeError(
-  'Please specify InteractiveRunner when creating '
-  

(beam) 01/01: Merge pull request #32330 Use proper coders in interactive cache.

2024-09-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f06df5dabf07eb349668169ee734359b62ccfc63
Merge: 6baba928348 f8bda18c737
Author: Robert Bradshaw 
AuthorDate: Tue Sep 3 11:39:28 2024 -0700

Merge pull request #32330 Use proper coders in interactive cache.

Formerly the coder used was always a url-excaped pickling[1] of windowed 
values. This is quite inefficient in time and space.

The default (text) sink is modified to use base64 encoding to avoid 
embedded newlines, and also has compression by defuault (which helps enormously 
in the case of common windowing and timestamp metadata).

Also add compression to account for base64 expansion and (often) highly 
repetative windowing metadata.

[1] The FastPrimitivesCoder is targeted at efficiently coding elements, not 
windows or windowedvalues.

 .../runners/interactive/cache_manager.py   | 50 ++
 .../runners/interactive/cache_manager_test.py  |  7 ++-
 .../runners/interactive/interactive_runner_test.py | 46 +++-
 .../runners/interactive/recording_manager_test.py  |  2 +-
 .../apache_beam/runners/interactive/utils.py   |  3 ++
 5 files changed, 92 insertions(+), 16 deletions(-)



(beam) branch master updated: [YAML] Better errors for unconsumed error outputs. (#32341)

2024-09-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6baba928348 [YAML] Better errors for unconsumed error outputs. (#32341)
6baba928348 is described below

commit 6baba928348bdd02f42dcbdd2005d69398e6a58f
Author: Robert Bradshaw 
AuthorDate: Tue Sep 3 11:38:23 2024 -0700

[YAML] Better errors for unconsumed error outputs. (#32341)
---
 sdks/python/apache_beam/yaml/yaml_transform.py| 9 -
 sdks/python/apache_beam/yaml/yaml_transform_unit_test.py  | 3 ++-
 website/www/site/content/en/documentation/sdks/yaml-errors.md | 5 -
 3 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py 
b/sdks/python/apache_beam/yaml/yaml_transform.py
index ffef9bbcd8f..ab86a2aaff5 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -882,7 +882,14 @@ def ensure_errors_consumed(spec):
   consumed.add(scope.get_transform_id_and_output_name(input))
 for error_pcoll, t in to_handle.items():
   if error_pcoll not in consumed:
-raise ValueError(f'Unconsumed error output for {identify_object(t)}.')
+config = t.get('config', t)
+transform_name = t.get('name', t.get('type'))
+error_output_name = config['error_handling']['output']
+raise ValueError(
+f'Unconsumed error output for {identify_object(t)}. '
+f'The output named {transform_name}.{error_output_name} '
+'must be used as an input to some other transform. '
+'See https://beam.apache.org/documentation/sdks/yaml-errors')
   return spec
 
 
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
index c4b6899329a..8c4b00351b2 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -901,7 +901,8 @@ class MainTest(unittest.TestCase):
 spec['transforms'] = [
 normalize_inputs_outputs(t) for t in spec['transforms']
 ]
-with self.assertRaisesRegex(ValueError, r"Unconsumed error.*"):
+with self.assertRaisesRegex(ValueError,
+r"Unconsumed error.*MyTransform.errors"):
   ensure_errors_consumed(spec)
 
   def test_ensure_errors_consumed_in_transform(self):
diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md 
b/website/www/site/content/en/documentation/sdks/yaml-errors.md
index caa3ad9af24..8c0d9f06ade 100644
--- a/website/www/site/content/en/documentation/sdks/yaml-errors.md
+++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md
@@ -32,7 +32,10 @@ recording them for later off-line analysis.  This is often 
called the
 "dead letter queue" pattern.
 
 Beam YAML has special support for this pattern if the transform supports a
-`error_handling` config parameter with an `output` field.  For example,
+`error_handling` config parameter with an `output` field.
+The `output` parameter is a name that must referenced as an input to
+another transform that will process the errors (e.g. by writing them out).
+For example,
 the following code will write all "good" processed records to one file and
 any "bad" records to a separate file.
 



(beam) branch master updated (6baba928348 -> f06df5dabf0)

2024-09-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6baba928348 [YAML] Better errors for unconsumed error outputs. (#32341)
 add f8bda18c737 Use proper coders in interactive cache.
 new f06df5dabf0 Merge pull request #32330 Use proper coders in interactive 
cache.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runners/interactive/cache_manager.py   | 50 ++
 .../runners/interactive/cache_manager_test.py  |  7 ++-
 .../runners/interactive/interactive_runner_test.py | 46 +++-
 .../runners/interactive/recording_manager_test.py  |  2 +-
 .../apache_beam/runners/interactive/utils.py   |  3 ++
 5 files changed, 92 insertions(+), 16 deletions(-)



(beam) branch master updated (64b4aa6fdde -> b875fd490d9)

2024-09-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 64b4aa6fdde Memoize toString() for MetricKey and MetricName (#32379)
 add 9771a425655 Add WindowedvalueParam option to DoFns.
 add b875fd490d9 Merge pull request #32305 Add WindowedValueParam option to 
DoFns.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/pipeline_test.py   | 14 ++
 sdks/python/apache_beam/runners/common.py  |  6 ++
 sdks/python/apache_beam/transforms/core.py |  2 ++
 3 files changed, 22 insertions(+)



(beam) branch master updated: Add doc string and signature to generated Python wrappers (#32337)

2024-08-27 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 9e3aeca6123 Add doc string and signature to generated Python wrappers 
(#32337)
9e3aeca6123 is described below

commit 9e3aeca6123463146f7ec8cd4fec855fb912d25c
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue Aug 27 19:05:44 2024 -0400

Add doc string and signature to generated Python wrappers (#32337)
---
 .../transforms/external_transform_provider.py  | 50 --
 .../external_transform_provider_it_test.py | 11 ++---
 sdks/python/gen_xlang_wrappers.py  |  3 +-
 3 files changed, 45 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py 
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 67adda5aec0..117c7f7c9b9 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -18,6 +18,8 @@
 import logging
 import re
 from collections import namedtuple
+from inspect import Parameter
+from inspect import Signature
 from typing import Dict
 from typing import List
 from typing import Tuple
@@ -58,6 +60,22 @@ def get_config_with_descriptions(
   return fields_with_descriptions
 
 
+def _generate_signature(schematransform: SchemaTransformsConfig) -> Signature:
+  schema = named_tuple_to_schema(schematransform.configuration_schema)
+  descriptions = schematransform.configuration_schema._field_descriptions
+  params: List[Parameter] = []
+  for field in schema.fields:
+annotation = str(typing_from_runner_api(field.type))
+description = descriptions[field.name]
+if description:
+  annotation = annotation + f": {description}"
+params.append(
+Parameter(
+field.name, Parameter.POSITIONAL_OR_KEYWORD, 
annotation=annotation))
+
+  return Signature(params)
+
+
 class ExternalTransform(PTransform):
   """Template for a wrapper class of an external SchemaTransform
 
@@ -69,7 +87,6 @@ class ExternalTransform(PTransform):
   # These attributes need to be set when
   # creating an ExternalTransform type
   default_expansion_service = None
-  description: str = ""
   identifier: str = ""
   configuration_schema: Dict[str, ParamInfo] = {}
 
@@ -138,18 +155,20 @@ class ExternalTransformProvider:
   ...   'beam:schematransform:org.apache.beam:bigquery_storage_read:v1')
   >>> provider.BigqueryStorageRead
 
-  To know more about the usage of a given transform, take a look at the
-  `description` attribute. This returns some documentation IF the underlying
-  SchemaTransform provides any.
-  >>> provider.BigqueryStorageRead.description
+  You can inspect the transform's documentation to know more about it. This
+  returns some documentation only IF the underlying SchemaTransform
+  implementation provides any.
+  >>> import inspect
+  >>> inspect.getdoc(provider.BigqueryStorageRead)
 
-  Similarly, the `configuration_schema` attribute returns information about the
+  Similarly, you can inspect the transform's signature to know more about its
   parameters, including their names, types, and any documentation that the
   underlying SchemaTransform may provide:
-  >>> provider.BigqueryStorageRead.configuration_schema
-  {'query': ParamInfo(type=typing.Optional[str], description='The SQL query to
-  be executed to read from the BigQuery table.', original_name='query'),
-  'row_restriction': ParamInfo(type=typing.Optional[str]...}
+  >>> inspect.signature(provider.BigqueryStorageRead)
+  (query: 'typing.Union[str, NoneType]: The SQL query to be executed to...',
+  row_restriction: 'typing.Union[str, NoneType]: Read only rows that match...',
+  selected_fields: 'typing.Union[typing.Sequence[str], NoneType]: Read ...',
+  table_spec: 'typing.Union[str, NoneType]: The fully-qualified name of ...')
 
   The retrieved external transform can be used as a normal PTransform like so::
 
@@ -213,14 +232,19 @@ class ExternalTransformProvider:
 skipped_urns.append(identifier)
 continue
 
-  self._transforms[identifier] = type(
-  name, (ExternalTransform, ),
+  transform = type(
+  name,
+  (ExternalTransform, ),
   dict(
   identifier=identifier,
   default_expansion_service=service,
   schematransform=config,
-  description=config.description,
+  # configuration_schema is used by the auto-wrapper generator
   configu

(beam) 01/01: Merge pull request #32303 Preserve numeric string literals when reading from json.

2024-08-27 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bc80e9fdbb21251fd8bf162b9c9ba980b2ec8059
Merge: 857eccedc55 679e5cc6ff8
Author: Robert Bradshaw 
AuthorDate: Tue Aug 27 15:52:36 2024 -0700

Merge pull request #32303 Preserve numeric string literals when reading 
from json.

 CHANGES.md|  5 +
 sdks/python/apache_beam/io/textio.py  | 14 --
 sdks/python/apache_beam/io/textio_test.py | 30 ++
 3 files changed, 47 insertions(+), 2 deletions(-)



(beam) branch master updated (857eccedc55 -> bc80e9fdbb2)

2024-08-27 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 857eccedc55 Refactor PubsubIO Lineage metrics to work with all runners 
(#32319)
 add 142e39250db Preserve numeric string literals when reading from json.
 add 679e5cc6ff8 Add a test.
 new bc80e9fdbb2 Merge pull request #32303 Preserve numeric string literals 
when reading from json.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.md|  5 +
 sdks/python/apache_beam/io/textio.py  | 14 --
 sdks/python/apache_beam/io/textio_test.py | 30 ++
 3 files changed, 47 insertions(+), 2 deletions(-)



(beam) 01/01: Merge pull request #32301 Rename delimiter to sep to pass to pandas.

2024-08-23 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e24a0c473fd238ccfe4f9d1c27e2e65cae2dd72f
Merge: 1e80815d12d c77460e32b9
Author: Robert Bradshaw 
AuthorDate: Fri Aug 23 11:50:32 2024 -0700

Merge pull request #32301 Rename delimiter to sep to pass to pandas.

 sdks/python/apache_beam/yaml/standard_io.yaml |  4 +--
 sdks/python/apache_beam/yaml/tests/tsv.yaml   | 49 +++
 2 files changed, 51 insertions(+), 2 deletions(-)



(beam) branch master updated (1e80815d12d -> e24a0c473fd)

2024-08-23 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1e80815d12d Parallelize building wheels per language version (#32297)
 add c77460e32b9 Rename delimiter to sep to pass to pandas.
 new e24a0c473fd Merge pull request #32301 Rename delimiter to sep to pass 
to pandas.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/standard_io.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/{csv.yaml => tsv.yaml} | 2 ++
 2 files changed, 4 insertions(+), 2 deletions(-)
 copy sdks/python/apache_beam/yaml/tests/{csv.yaml => tsv.yaml} (96%)



(beam) 01/01: Merge pull request #31856 Add ErrorHandler DLQ API to Python.

2024-08-21 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b3a874f476646676bf7eb909cae738a696f36303
Merge: 3fb4fd0f6ea 36e5eff40af
Author: Robert Bradshaw 
AuthorDate: Wed Aug 21 00:27:38 2024 -0700

Merge pull request #31856 Add ErrorHandler DLQ API to Python.

 sdks/python/apache_beam/pipeline.py|   7 +
 sdks/python/apache_beam/transforms/core.py |  23 +++-
 .../apache_beam/transforms/error_handling.py   | 126 ++
 .../apache_beam/transforms/error_handling_test.py  | 148 +
 4 files changed, 303 insertions(+), 1 deletion(-)



(beam) branch master updated (3fb4fd0f6ea -> b3a874f4766)

2024-08-21 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 3fb4fd0f6ea Bump github.com/aws/aws-sdk-go-v2/feature/s3/manager in 
/sdks (#32266)
 add 917e99670ed Add ErrorHandler pattern to Python.
 add c4be92fd769 Add with_error_handler to ParDo (Map, FlatMap, etc.)
 add 34e28f396ea Add collecting error handler.
 add b69f4d8eb45 Add test stanza and other lint fixes.
 add daf28cdf979 Fix typo.
 add 5141f14503d Fix typo.
 add 049e4b3b6b9 Merge branch 'master' into error-handler
 add 36e5eff40af Add test of with_exception_handling side effects.
 new b3a874f4766 Merge pull request #31856 Add ErrorHandler DLQ API to 
Python.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/pipeline.py|   7 +
 sdks/python/apache_beam/transforms/core.py |  23 +++-
 .../apache_beam/transforms/error_handling.py   | 126 ++
 .../apache_beam/transforms/error_handling_test.py  | 148 +
 4 files changed, 303 insertions(+), 1 deletion(-)
 create mode 100644 sdks/python/apache_beam/transforms/error_handling.py
 create mode 100644 sdks/python/apache_beam/transforms/error_handling_test.py



(beam) branch master updated (254519b857a -> 4365f73cbe3)

2024-08-20 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 254519b857a Add Lineage metrics to KafkaIO (#32170)
 add 89b1a7f2028 [yaml] Fix PubSub error message
 new 4365f73cbe3 Merge pull request #32093 [yaml] Fix PubSub error message

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/yaml_io.py | 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)



(beam) 01/01: Merge pull request #32093 [yaml] Fix PubSub error message

2024-08-20 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4365f73cbe3ef860126ec9cca5e32e22869cb72f
Merge: 254519b857a 89b1a7f2028
Author: Robert Bradshaw 
AuthorDate: Tue Aug 20 12:33:46 2024 -0700

Merge pull request #32093 [yaml] Fix PubSub error message

 sdks/python/apache_beam/yaml/yaml_io.py | 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)



(beam) 01/01: Merge pull request #32141 Add basic testing for yaml join docs.

2024-08-19 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 38dfbd4c35e9a9e528bb905b9690666ec24dc785
Merge: 6b4a7a5d73e 7f1c7f4d8df
Author: Robert Bradshaw 
AuthorDate: Mon Aug 19 10:57:50 2024 -0700

Merge pull request #32141 Add basic testing for yaml join docs.

 sdks/python/apache_beam/yaml/readme_test.py| 39 ++
 sdks/python/apache_beam/yaml/yaml_join.py  |  3 +-
 .../content/en/documentation/sdks/yaml-join.md |  2 +-
 3 files changed, 36 insertions(+), 8 deletions(-)



(beam) branch master updated (6b4a7a5d73e -> 38dfbd4c35e)

2024-08-19 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6b4a7a5d73e feat: optimize Spanner changestream metadata table (#32213)
 add e2bf5d6dbe3 Add basic testing for yaml join docs.
 add 7f1c7f4d8df make linter happy
 new 38dfbd4c35e Merge pull request #32141 Add basic testing for yaml join 
docs.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/readme_test.py| 39 ++
 sdks/python/apache_beam/yaml/yaml_join.py  |  3 +-
 .../content/en/documentation/sdks/yaml-join.md |  2 +-
 3 files changed, 36 insertions(+), 8 deletions(-)



(beam) branch master updated: Create Beam YAML Join documentation (#31494)

2024-08-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f73a6d1570a Create Beam YAML Join documentation (#31494)
f73a6d1570a is described below

commit f73a6d1570acc2945db4d80206dd86d6054f2ac2
Author: Timothy Itodo 
AuthorDate: Fri Aug 9 14:06:02 2024 -0500

Create Beam YAML Join documentation (#31494)
---
 .../content/en/documentation/sdks/yaml-join.md | 182 +
 .../layouts/partials/section-menu/en/sdks.html |   1 +
 2 files changed, 183 insertions(+)

diff --git a/website/www/site/content/en/documentation/sdks/yaml-join.md 
b/website/www/site/content/en/documentation/sdks/yaml-join.md
new file mode 100644
index 000..d207926ff99
--- /dev/null
+++ b/website/www/site/content/en/documentation/sdks/yaml-join.md
@@ -0,0 +1,182 @@
+---
+type: languages
+title: "Apache Beam YAML Join"
+---
+
+
+# Beam YAML Join
+
+Beam YAML can join two or more inputs on specified columns. For example, the
+following pipeline joins the First Input pcollection and Second Input
+pcollection when col1 in First Input is equal to col2 in Second Input.
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+  config:
+equalities:
+  - input1: col1
+input2: col2
+```
+
+When joining multiple inputs on one column that is named the same across all 
the
+inputs, one can use the following  shorthand syntax:
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+input3: Third Input
+  config:
+equalities: col
+```
+
+## Join Types
+
+When using the Join transform, one can specify the type of join to perform on
+the inputs. If no join type is specified, the inputs are all joined using an
+inner join. The supported join types are:
+
+| Join Type | YAML Keyword |
+|  | --- |
+| Inner Join | inner |
+| Full Outer Join | left |
+| Right Outer Join | right |
+
+The following example  joins two inputs  using an inner join on the specified
+equalities:
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+  config:
+type: inner
+equalities:
+  - input1: col1
+input2: col1
+```
+
+
+The following example joins two inputs using a left outer join on the specified
+equalities. In this case, all rows from input1 will be kept because input1 is
+the left input. Order of joins follows the sequence as specified in equalities.
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+  config:
+type: left
+equalities:
+  - input1: col1
+input2: col1
+```
+
+The following example joins three inputs using an full outer join on the
+specified equalities:
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+input3: Third Input
+  config:
+type: outer
+equalities:
+  - input1: col1
+input2: col1
+  - input2: col2
+input3: col2
+```
+
+If you want a combination of join types, you can specify the inputs to be outer
+joined. The following  example joins input1 with input2 using a right outer 
join
+since input2 is on the right side and will join input2 with input 3 using a 
left
+outer join since input2 is on the left side.
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+input3: Third Input
+  config:
+type:
+  outer:
+- input2
+equalities:
+  - input1: col1
+input2: col1
+  - input2: col2
+input3: col2
+```
+
+## Fields
+By default, the join transform includes all columns from all input tables. If
+column names clash, it's best to rename them explicitly. Otherwise, the system
+will deduplicate names by adding a numeric suffix
+
+To choose which columns to output, or to customize the output column names, use
+the "fields" configuration.
+
+To specify which columns to output from an input, use the input reference as 
the
+configuration key and a list of desired columns as the configuration value. The
+following example outputs col1 from input1, col2 and col3 from input2, and all
+the columns from input 3. If there is a name clash, it appends a numeric suffix
+to avoid duplicate naming.
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+input3: Third Input
+  config:
+equalities: col1
+fields:
+  input1: [col1]
+  input2: [col2, col3]
+```
+
+To rename a column in the output, create a mapping for the input with the key 
as
+the new column name and the value as the original column name. The following
+example maps col1 from input3 to the column name "renamed_col1":
+
+```
+- type: Join
+  input:
+input1: First Input
+input2: Second Input
+input3: Third Input
+  config:
+equalities: col1
+fields:
+  

(beam) 01/01: Merge pull request #32112 Upgrade Apache Beam to use Cython 3.x.

2024-08-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 49e98e59fd20124a287ac1845002a0b9499cd3cb
Merge: 1c0cfa1ccba 00de74030e2
Author: Robert Bradshaw 
AuthorDate: Fri Aug 9 11:01:55 2024 -0700

Merge pull request #32112 Upgrade Apache Beam to use Cython 3.x.

 sdks/python/apache_beam/coders/coder_impl.py  | 2 --
 sdks/python/apache_beam/coders/stream.pyx | 2 --
 sdks/python/apache_beam/metrics/cells.py  | 2 --
 sdks/python/apache_beam/metrics/execution.py  | 2 --
 sdks/python/apache_beam/metrics/monitoring_infos.py   | 2 --
 sdks/python/apache_beam/runners/common.py | 1 -
 sdks/python/apache_beam/runners/worker/logger.py  | 2 --
 sdks/python/apache_beam/runners/worker/opcounters.py  | 2 --
 sdks/python/apache_beam/runners/worker/operations.py  | 2 --
 sdks/python/apache_beam/runners/worker/statesampler_fast.pyx  | 4 +---
 sdks/python/apache_beam/testing/fast_test_utils.pyx   | 2 --
 sdks/python/apache_beam/transforms/cy_combiners.py| 2 --
 .../apache_beam/transforms/cy_dataflow_distribution_counter.pyx   | 2 --
 sdks/python/apache_beam/transforms/stats.py   | 2 --
 sdks/python/apache_beam/utils/counters.py | 1 -
 sdks/python/apache_beam/utils/windowed_value.py   | 2 --
 sdks/python/container/base_image_requirements_manual.txt  | 4 +---
 sdks/python/container/py310/base_image_requirements.txt   | 2 +-
 sdks/python/container/py311/base_image_requirements.txt   | 2 +-
 sdks/python/container/py312/base_image_requirements.txt   | 2 +-
 sdks/python/container/py38/base_image_requirements.txt| 2 +-
 sdks/python/container/py39/base_image_requirements.txt| 2 +-
 sdks/python/pyproject.toml| 2 +-
 sdks/python/setup.py  | 2 +-
 24 files changed, 9 insertions(+), 41 deletions(-)



(beam) branch master updated (1c0cfa1ccba -> 49e98e59fd2)

2024-08-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1c0cfa1ccba Expose watermarkIdleDurationThreshold parameter to the 
user in SolaceIO (#32109)
 add 679e9d799e5 Upgrade Beam to use Cython 3.
 add c825434965e Update base image requirements.
 add 2b7e84239ea Remove now unneeded langauge level specifications.
 add 1de0c4670ee Add no-except to time-critical function.
 add 00de74030e2 Merge branch 'master' into cython3
 new 49e98e59fd2 Merge pull request #32112 Upgrade Apache Beam to use 
Cython 3.x.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/coders/coder_impl.py  | 2 --
 sdks/python/apache_beam/coders/stream.pyx | 2 --
 sdks/python/apache_beam/metrics/cells.py  | 2 --
 sdks/python/apache_beam/metrics/execution.py  | 2 --
 sdks/python/apache_beam/metrics/monitoring_infos.py   | 2 --
 sdks/python/apache_beam/runners/common.py | 1 -
 sdks/python/apache_beam/runners/worker/logger.py  | 2 --
 sdks/python/apache_beam/runners/worker/opcounters.py  | 2 --
 sdks/python/apache_beam/runners/worker/operations.py  | 2 --
 sdks/python/apache_beam/runners/worker/statesampler_fast.pyx  | 4 +---
 sdks/python/apache_beam/testing/fast_test_utils.pyx   | 2 --
 sdks/python/apache_beam/transforms/cy_combiners.py| 2 --
 .../apache_beam/transforms/cy_dataflow_distribution_counter.pyx   | 2 --
 sdks/python/apache_beam/transforms/stats.py   | 2 --
 sdks/python/apache_beam/utils/counters.py | 1 -
 sdks/python/apache_beam/utils/windowed_value.py   | 2 --
 sdks/python/container/base_image_requirements_manual.txt  | 4 +---
 sdks/python/container/py310/base_image_requirements.txt   | 2 +-
 sdks/python/container/py311/base_image_requirements.txt   | 2 +-
 sdks/python/container/py312/base_image_requirements.txt   | 2 +-
 sdks/python/container/py38/base_image_requirements.txt| 2 +-
 sdks/python/container/py39/base_image_requirements.txt| 2 +-
 sdks/python/pyproject.toml| 2 +-
 sdks/python/setup.py  | 2 +-
 24 files changed, 9 insertions(+), 41 deletions(-)



(beam) branch master updated: Minor optimization for the common case of merging empty string sets. (#31803)

2024-08-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 80ae93217c5 Minor optimization for the common case of merging empty 
string sets. (#31803)
80ae93217c5 is described below

commit 80ae93217c5ac74e41cbedaeea7806fb0f05c2a9
Author: Robert Bradshaw 
AuthorDate: Mon Aug 5 09:49:55 2024 -0700

Minor optimization for the common case of merging empty string sets. 
(#31803)
---
 .../apache/beam/runners/core/metrics/StringSetData.java | 17 ++---
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
index 93dfb8e3ebc..466d4ad46eb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core.metrics;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -50,12 +49,16 @@ public abstract class StringSetData implements Serializable 
{
* Combines this {@link StringSetData} with other, both original 
StringSetData are left intact.
*/
   public StringSetData combine(StringSetData other) {
-// do not merge other on this as this StringSetData might hold an 
immutable set like in case
-// of  EmptyStringSetData
-Set combined = new HashSet<>();
-combined.addAll(this.stringSet());
-combined.addAll(other.stringSet());
-return StringSetData.create(combined);
+if (this.stringSet().isEmpty()) {
+  return other;
+} else if (other.stringSet().isEmpty()) {
+  return this;
+} else {
+  ImmutableSet.Builder combined = ImmutableSet.builder();
+  combined.addAll(this.stringSet());
+  combined.addAll(other.stringSet());
+  return StringSetData.create(combined.build());
+}
   }
 
   /**



(beam) branch master updated (eec20689f21 -> 346011b0a21)

2024-07-31 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from eec20689f21 Fix StringSet tests on portable runners (#31999)
 add 346011b0a21 Fix row ranges issue in Bigtable Read. (#31990)

No new revisions were added by this update.

Summary of changes:
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 13 +++--
 .../io/gcp/bigtable/BigtableServiceImplTest.java   | 62 ++
 2 files changed, 71 insertions(+), 4 deletions(-)



(beam) branch master updated: Export string sets in monitoring infos. (#31838)

2024-07-11 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 50a3403a474 Export string sets in monitoring infos. (#31838)
50a3403a474 is described below

commit 50a3403a4742e1c9e264f57f4411969daeff4642
Author: Robert Bradshaw 
AuthorDate: Thu Jul 11 09:45:09 2024 -0700

Export string sets in monitoring infos. (#31838)
---
 .../apache/beam/model/pipeline/v1/metrics.proto| 11 +
 .../runners/core/metrics/MetricsContainerImpl.java | 53 +-
 .../core/metrics/MonitoringInfoConstants.java  |  2 +
 .../core/metrics/SimpleMonitoringInfoBuilder.java  | 11 +
 .../core/metrics/MetricsContainerImplTest.java | 32 +
 5 files changed, 108 insertions(+), 1 deletion(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index 13c87bc1130..4ec189e4637 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -187,6 +187,17 @@ message MonitoringInfoSpecs {
   }]
 }];
 
+// Represents a set of strings seen across bundles.
+USER_SET_STRING = 21 [(monitoring_info_spec) = {
+  urn: "beam:metric:user:set_string:v1",
+  type: "beam:metrics:set_string:v1",
+  required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+  annotations: [{
+key: "description",
+value: "URN utilized to report user metric."
+  }]
+}];
+
 // General monitored state information which contains structured 
information
 // which does not fit into a typical metric format. See MonitoringTableData
 // for more details.
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index a2f6511d512..99cf9850850 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.core.metrics;
 
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
@@ -331,6 +334,28 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
 return builder.build();
   }
 
+  /** @return The MonitoringInfo metadata from the string set metric. */
+  private @Nullable SimpleMonitoringInfoBuilder 
stringSetToMonitoringMetadata(MetricKey metricKey) {
+return metricToMonitoringMetadata(
+metricKey,
+MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE,
+MonitoringInfoConstants.Urns.USER_SET_STRING);
+  }
+
+  /**
+   * @param metricUpdate
+   * @return The MonitoringInfo generated from the string set metricUpdate.
+   */
+  private @Nullable MonitoringInfo stringSetUpdateToMonitoringInfo(
+  MetricUpdate metricUpdate) {
+SimpleMonitoringInfoBuilder builder = 
stringSetToMonitoringMetadata(metricUpdate.getKey());
+if (builder == null) {
+  return null;
+}
+builder.setStringSetValue(metricUpdate.getUpdate());
+return builder.build();
+  }
+
   /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
   @Override
   public Iterable getMonitoringInfos() {
@@ -358,

(beam) branch master updated: Merge pull request #31823 Add lineage information for BigQuery sinks.

2024-07-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c0cf39001a Merge pull request #31823 Add lineage information for 
BigQuery sinks.
7c0cf39001a is described below

commit 7c0cf39001acdedd5a1339f7def059c915194cbc
Author: Robert Bradshaw 
AuthorDate: Wed Jul 10 12:45:49 2024 -0700

Merge pull request #31823 Add lineage information for BigQuery sinks.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 21 +++--
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java|  5 ++--
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java |  4 +--
 .../beam/sdk/io/gcp/bigquery/CreateTables.java |  8 -
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 14 +
 .../bigquery/StorageApiWritesShardedRecords.java   |  6 
 .../beam/sdk/io/gcp/bigquery/WriteRename.java  |  6 
 .../beam/sdk/io/gcp/bigquery/WriteTables.java  |  6 
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 34 +++---
 9 files changed, 92 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 7f5d675ccf7..61bed66a336 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -54,6 +54,7 @@ import 
org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -412,9 +413,23 @@ public class BigQueryHelpers {
 return sb.toString();
   }
 
-  public static String dataCatalogName(TableReference ref) {
-return String.format(
-"bigquery:%s.%s.%s", ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId());
+  public static String dataCatalogName(TableReference ref, BigQueryOptions 
options) {
+String tableIdBase;
+int ix = ref.getTableId().indexOf('$');
+if (ix == -1) {
+  tableIdBase = ref.getTableId();
+} else {
+  tableIdBase = ref.getTableId().substring(0, ix);
+}
+String projectId;
+if (!Strings.isNullOrEmpty(ref.getProjectId())) {
+  projectId = ref.getProjectId();
+} else if (!Strings.isNullOrEmpty(options.getBigQueryProject())) {
+  projectId = options.getBigQueryProject();
+} else {
+  projectId = options.getProject();
+}
+return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), 
tableIdBase);
   }
 
   static  List getOrCreateMapListValue(Map> map, K key) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a863c49f46a..38c0c8e43b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,7 @@ abstract class BigQuerySourceBase extends 
BoundedSource {
 BigQueryHelpers.toTableSpec(tableToExtract)));
   }
   // emit this table ID as a lineage source
-  
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract));
+  Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, 
bqOptions));
 
   TableSchema schema = table.getSchema();
   JobService jobService = bqServices.getJobService(bqOptions);
@@ -158,7 +158,8 @@ abstract class BigQuerySourceBase extends 
BoundedSource {
   if (res.extractedFiles.size() > 0) {
 BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 // emit this table ID as a lineage source
-
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions)));
+Lineage.getSources()
+.add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions), 
bqOptions));
 final String extractDestinationDir =
 resolveTempLocation(bqOptions.getTempLocation(), 
"BigQueryExtractTemp", stepUuid);
 // Match all files in the de

(beam) branch master updated (080c80a9573 -> 36961405769)

2024-07-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 080c80a9573 Moving to 2.59.0-SNAPSHOT on master branch.
 add 43b237e5040 Modernize python type hints for apache_beam
 add cd495e9cf8a Modernize python type hints for apache_beam/coders
 add 6143cd0e0a8 Modernize python type hints for apache_beam/dataframe
 add d75916b6342 Modernize python type hints for 
apache_beam/examples/cookbook
 add c842252b4b5 Modernize python type hints for apache_beam/internal
 add 33bde4de9c5 Modernize python type hints for 
apache_beam/internal/metrics
 add d73982af054 Modernize python type hints for apache_beam/io
 add 8f6f24dc1ff Modernize python type hints for apache_beam/io/azure
 add b8029e9aa1e Modernize python type hints for apache_beam/io/flink
 add f49a29a4685 Modernize python type hints for apache_beam/io/gcp
 add 5d5a09b6b3e Modernize python type hints for apache_beam/metrics
 add fbafe8d780b Modernize python type hints for apache_beam/ml/gcp
 add 0eab29802a3 Modernize python type hints for apache_beam/options
 add 842b8ecab01 Modernize python type hints for apache_beam/runners
 add 0763d7e50f3 Modernize python type hints for apache_beam/runners/direct
 add 3cf0c5512a3 Modernize python type hints for 
apache_beam/runners/interactive
 add b41698210df Modernize python type hints for apache_beam/runners/job
 add 8b540eb5075 Modernize python type hints for 
apache_beam/runners/portability
 add 8fdbe88dc6e Modernize python type hints for apache_beam/runners/worker
 add 77d81895109 Modernize python type hints for 
apache_beam/testing/benchmarks
 add acfd72c7c06 Modernize python type hints for 
apache_beam/testing/load_tests
 add 4402f2d44e9 Modernize python type hints for apache_beam/transforms
 add 79d4ffd20ba Modernize python type hints for apache_beam/typehints
 add f2ffa5ec49e Modernize python type hints for apache_beam/utils
 add abdb1b742a9 Fix circular references, mypy complaints.
 add d4de077a2a9 Fix bad type declarations.
 add 14c52d66ec6 Fix bad typing in PubSub tests.
 add 64e6194b948 Preserve existing linter comments.
 add a0ba8dea7d8 isort
 add 36961405769 Merge pull request #31755 Modernize type hints.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/coders/observable_test.py  |   2 +-
 sdks/python/apache_beam/coders/row_coder.py|   3 +-
 sdks/python/apache_beam/coders/slow_stream.py  |  32 +-
 .../apache_beam/coders/standard_coders_test.py |   2 +-
 sdks/python/apache_beam/coders/typecoders.py   |  20 +-
 sdks/python/apache_beam/dataframe/convert.py   |  38 +--
 sdks/python/apache_beam/dataframe/frame_base.py|  20 +-
 sdks/python/apache_beam/dataframe/partitionings.py |   9 +-
 sdks/python/apache_beam/dataframe/schemas.py   |   9 +-
 sdks/python/apache_beam/dataframe/schemas_test.py  |  85 --
 .../examples/cookbook/bigtableio_it_test.py|   2 +-
 .../examples/cookbook/datastore_wordcount.py   |   4 +-
 .../apache_beam/internal/cloudpickle_pickler.py|   4 +-
 sdks/python/apache_beam/internal/dill_pickler.py   |   7 +-
 sdks/python/apache_beam/internal/metrics/cells.py  |  27 +-
 sdks/python/apache_beam/internal/metrics/metric.py |  66 +++--
 sdks/python/apache_beam/internal/module_test.py|   4 +-
 sdks/python/apache_beam/internal/pickler.py|   3 +-
 sdks/python/apache_beam/internal/util.py   |  11 +-
 .../apache_beam/io/azure/blobstoragefilesystem.py  |   4 -
 sdks/python/apache_beam/io/filebasedsource.py  |  24 +-
 sdks/python/apache_beam/io/filesystem.py   |  77 ++---
 .../io/flink/flink_streaming_impulse_source.py |   2 +-
 .../apache_beam/io/gcp/bigquery_avro_tools.py  |  26 +-
 .../apache_beam/io/gcp/bigquery_schema_tools.py|   5 +-
 .../apache_beam/io/gcp/datastore/v1new/helper.py   |   4 +-
 .../apache_beam/io/gcp/datastore/v1new/types.py|  19 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py|   8 +-
 sdks/python/apache_beam/io/gcp/pubsub.py   |  58 ++--
 sdks/python/apache_beam/io/gcp/pubsub_test.py  |  10 +-
 sdks/python/apache_beam/io/hadoopfilesystem.py |  12 +-
 sdks/python/apache_beam/io/iobase.py   |  79 ++---
 sdks/python/apache_beam/io/jdbc.py |  18 +-
 sdks/python/apache_beam/io/localfilesystem.py  |   8 +-
 sdks/python/apache_beam/io/restriction_trackers.py |   9 +-
 sdks/python/apache_beam/io/textio.py   |  66 ++---
 sdks/python/apache_beam/metrics/metric.py  |  94 +++---
 sdks/python/apache_beam/metrics/metricbase.py  |  12 +-
 .../python/apache_beam/ml/gcp/naturallanguageml.py |  36 +--
 sdks/python/apache_beam/options/value_provider.py  |   2 +-
 sdks/python/apache_beam/pvalue.py  | 125 
 .../apache_beam

(beam) 01/01: Merge pull request #31805 Introduce support for emitting lineage in BQ Source.

2024-07-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c9adc8ee6c680c7d40125ea9f77f79ec307bb46e
Merge: f72f6ce0e81 c827bbac387
Author: Robert Bradshaw 
AuthorDate: Tue Jul 9 19:48:58 2024 -0700

Merge pull request #31805 Introduce support for emitting lineage in BQ 
Source.

 .../java/org/apache/beam/sdk/metrics/Lineage.java  | 41 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  5 +++
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java|  6 +++-
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java | 13 +--
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java| 27 +-
 5 files changed, 88 insertions(+), 4 deletions(-)



(beam) branch master updated (f72f6ce0e81 -> c9adc8ee6c6)

2024-07-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f72f6ce0e81 Remove CsvIOParseResult (#31819)
 add 5579a16de7d Introduce support for emitting lineage in BiqQueryIOs
 add dded4f06d82 Be spotless
 add 024692647b4 A couple improvements to BQ source lineage.
 add c827bbac387 Update contains test.
 new c9adc8ee6c6 Merge pull request #31805 Introduce support for emitting 
lineage in BQ Source.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sdk/metrics/{NoOpCounter.java => Lineage.java} | 36 --
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  5 +++
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java|  6 +++-
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java | 13 ++--
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java| 27 +++-
 5 files changed, 60 insertions(+), 27 deletions(-)
 copy 
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/{NoOpCounter.java => 
Lineage.java} (53%)



(beam) branch master updated (ef143aed418 -> dda0fbf57be)

2024-07-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ef143aed418 Add link to security model (#31811)
 add 78bab0dd15e Avoid length-prefix-bytes substitutions for Flink 
boundaries.
 new dda0fbf57be Merge pull request #31579 Avoid length-prefix-bytes 
substitutions for Flink boundaries.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/adapter/BeamAdapterCoderUtils.java   | 16 +++
 .../runners/flink/adapter/BeamAdapterUtils.java| 22 ++
 .../flink/adapter/BeamFlinkDataSetAdapter.java |  1 -
 .../flink/adapter/BeamFlinkDataStreamAdapter.java  |  1 -
 .../flink/adapter/BeamFlinkDataSetAdapterTest.java | 50 ++
 .../wire/LengthPrefixUnknownCoders.java| 18 +++-
 6 files changed, 105 insertions(+), 3 deletions(-)



(beam) 01/01: Merge pull request #31579 Avoid length-prefix-bytes substitutions for Flink boundaries.

2024-07-09 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit dda0fbf57be84d48c809de0a80bf6ebba77740e6
Merge: ef143aed418 78bab0dd15e
Author: Robert Bradshaw 
AuthorDate: Tue Jul 9 11:37:33 2024 -0700

Merge pull request #31579 Avoid length-prefix-bytes substitutions for Flink 
boundaries.

 .../flink/adapter/BeamAdapterCoderUtils.java   | 16 +++
 .../runners/flink/adapter/BeamAdapterUtils.java| 22 ++
 .../flink/adapter/BeamFlinkDataSetAdapter.java |  1 -
 .../flink/adapter/BeamFlinkDataStreamAdapter.java  |  1 -
 .../flink/adapter/BeamFlinkDataSetAdapterTest.java | 50 ++
 .../wire/LengthPrefixUnknownCoders.java| 18 +++-
 6 files changed, 105 insertions(+), 3 deletions(-)



(beam) branch master updated (746f3c5557e -> de4645d4507)

2024-07-08 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 746f3c5557e Use go 1.22 for self-hosted GHAs (#31767)
 add de4645d4507 Add support for StringSet metric in Java SDK. (#31789)

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/model/pipeline/v1/metrics.proto|   8 ++
 .../runners/core/metrics/DefaultMetricResults.java |  14 ++-
 .../beam/runners/core/metrics/MetricUpdates.java   |  34 ---
 .../runners/core/metrics/MetricsContainerImpl.java |  57 +++-
 .../core/metrics/MetricsContainerStepMap.java  |   6 ++
 .../core/metrics/MonitoringInfoConstants.java  |   2 +
 .../core/metrics/MonitoringInfoEncodings.java  |  26 ++
 .../{CounterCell.java => StringSetCell.java}   |  74 ---
 .../beam/runners/core/metrics/StringSetData.java   |  97 
 .../core/metrics/MetricsContainerImplTest.java |  14 +++
 .../core/metrics/MetricsContainerStepMapTest.java  | 102 +
 .../core/metrics/MonitoringInfoEncodingsTest.java  |  28 ++
 .../runners/core/metrics/StringSetCellTest.java|  97 
 .../runners/core/metrics/StringSetDataTest.java| 102 +
 .../apache/beam/runners/direct/DirectMetrics.java  |  45 -
 .../beam/runners/direct/DirectMetricsTest.java |  26 +-
 .../metrics/CustomMetricQueryResults.java  |  11 +++
 .../extensions/metrics/MetricsHttpSinkTest.java|   7 +-
 .../beam/runners/dataflow/DataflowMetrics.java |  41 ++---
 .../beam/runners/dataflow/DataflowMetricsTest.java |  59 
 .../dataflow/worker/BatchModeExecutionContext.java |   7 +-
 .../dataflow/worker/DataflowMetricsContainer.java  |   6 ++
 .../worker/MetricsToCounterUpdateConverter.java|  18 +++-
 .../worker/StreamingStepMetricsContainer.java  |  27 +-
 .../worker/BatchModeExecutionContextTest.java  |  34 +++
 .../worker/StreamingStepMetricsContainerTest.java  |  58 
 .../runners/jet/FailedRunningPipelineResults.java  |   6 ++
 .../beam/runners/jet/metrics/JetMetricResults.java |  54 ++-
 .../runners/jet/metrics/JetMetricsContainer.java   |  24 -
 .../{DistributionImpl.java => StringSetImpl.java}  |  32 +++
 .../beam/runners/portability/PortableMetrics.java  |  40 +++-
 .../runners/portability/PortableRunnerTest.java|  17 
 .../beam/sdk/metrics/MetricQueryResults.java   |  13 ++-
 .../org/apache/beam/sdk/metrics/MetricResult.java  |   2 +-
 .../java/org/apache/beam/sdk/metrics/Metrics.java  |  46 ++
 .../apache/beam/sdk/metrics/MetricsContainer.java  |   6 ++
 .../org/apache/beam/sdk/metrics/StringSet.java |  22 +++--
 .../apache/beam/sdk/metrics/StringSetResult.java   |  61 
 ...unterMetrics.java => UsesStringSetMetrics.java} |   6 +-
 .../org/apache/beam/sdk/metrics/MetricsTest.java   |  65 -
 .../beam/sdk/metrics/StringSetResultTest.java  |  64 +
 .../fn/harness/control/ExecutionStateSampler.java  |   9 ++
 .../harness/control/ExecutionStateSamplerTest.java |  22 +
 43 files changed, 1366 insertions(+), 123 deletions(-)
 copy 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/{CounterCell.java
 => StringSetCell.java} (59%)
 create mode 100644 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
 create mode 100644 
runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
 create mode 100644 
runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java
 copy 
runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/{DistributionImpl.java
 => StringSetImpl.java} (55%)
 copy 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BeamStoppableFunction.java
 => sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java 
(68%)
 create mode 100644 
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java
 copy 
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/{UsesCounterMetrics.java
 => UsesStringSetMetrics.java} (85%)
 create mode 100644 
sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java



(beam) branch master updated (1e873f42e14 -> 3212688e2e6)

2024-07-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1e873f42e14 Use highmem runner for 
beam_PostRelease_NightlySnapshot.yml (#31749)
 add b68b29a3ff6 Basic yaml-defined provider.
 add d2df083a029 Refactor jinja templatiziation to common location.
 add 3212688e2e6 Merge pull request #31684 Basic yaml-defined provider.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/typehints/schemas.py   |  8 +--
 sdks/python/apache_beam/yaml/json_utils.py |  3 +-
 sdks/python/apache_beam/yaml/main.py   | 15 +-
 sdks/python/apache_beam/yaml/yaml_provider.py  | 57 +
 .../apache_beam/yaml/yaml_provider_unit_test.py| 58 ++
 sdks/python/apache_beam/yaml/yaml_transform.py | 25 --
 6 files changed, 146 insertions(+), 20 deletions(-)



(beam) branch master updated (9bdcb672d08 -> c3756c04e39)

2024-06-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 9bdcb672d08 Create CsvIOParseConfiguration class (#31714)
 add c3756c04e39 Remove excessive logging in test. (#31715)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/parquetio_test.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)



(beam) branch master updated: [YAML] Allow explicitly including external provider lists. (#31604)

2024-06-20 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d61e04ad624 [YAML] Allow explicitly including external provider lists. 
(#31604)
d61e04ad624 is described below

commit d61e04ad624dc1eff854988e3ea8da60a53f0715
Author: Robert Bradshaw 
AuthorDate: Thu Jun 20 17:58:32 2024 -0700

[YAML] Allow explicitly including external provider lists. (#31604)

This can be more useful than jinja's {% include %} as it can refer
to urls, does not have to assume perfect indentation of the included
file, and avoids applying templitization to the included file.
---
 sdks/python/apache_beam/yaml/pipeline.schema.yaml  | 23 ++-
 sdks/python/apache_beam/yaml/yaml_provider.py  | 51 +++---
 .../apache_beam/yaml/yaml_provider_unit_test.py| 80 ++
 sdks/python/apache_beam/yaml/yaml_transform.py | 10 +--
 4 files changed, 146 insertions(+), 18 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml 
b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
index 40f576c1618..f68a7306d94 100644
--- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml
+++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
@@ -154,6 +154,27 @@ $defs:
   - transforms
   - config
 
+  providerInclude:
+# TODO(robertwb): Consider enumerating the provider types along with
+# the arguments they accept/expect (possibly in a separate schema file).
+type: object
+properties:
+  include: { type: string }
+  __line__: {}
+  __uuid__: {}
+additionalProperties: false
+required:
+  - include
+
+  providerOrProviderInclude:
+if:
+  properties:
+include {}
+then:
+  $ref: '#/$defs/providerInclude'
+else:
+  $ref: '#/$defs/provider'
+
 type: object
 properties:
   pipeline:
@@ -185,7 +206,7 @@ properties:
   providers:
 type: array
 items:
-  $ref: '#/$defs/provider'
+  $ref: '#/$defs/providerOrProviderInclude'
   options:
 type: object
 required:
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 794cad0ec7f..46ec0700ee2 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -34,6 +34,7 @@ from typing import Any
 from typing import Callable
 from typing import Dict
 from typing import Iterable
+from typing import Iterator
 from typing import Mapping
 from typing import Optional
 
@@ -45,6 +46,7 @@ import apache_beam as beam
 import apache_beam.dataframe.io
 import apache_beam.io
 import apache_beam.transforms.util
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.portability.api import schema_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.util import assert_that
@@ -222,7 +224,10 @@ class ExternalProvider(Provider):
   config['version'] = beam_version
 if type in cls._provider_types:
   try:
-return cls._provider_types[type](urns, **config)
+result = cls._provider_types[type](urns, **config)
+if not hasattr(result, 'to_json'):
+  result.to_json = lambda: spec
+return result
   except Exception as exn:
 raise ValueError(
 f'Unable to instantiate provider of type {type} '
@@ -1153,18 +1158,44 @@ class RenamingProvider(Provider):
 self._underlying_provider.cache_artifacts()
 
 
-def parse_providers(provider_specs):
-  providers = collections.defaultdict(list)
+def flatten_included_provider_specs(
+provider_specs: Iterable[Mapping]) -> Iterator[Mapping]:
+  from apache_beam.yaml.yaml_transform import SafeLineLoader
   for provider_spec in provider_specs:
-provider = ExternalProvider.provider_from_spec(provider_spec)
-for transform_type in provider.provided_transforms():
-  providers[transform_type].append(provider)
-  # TODO: Do this better.
-  provider.to_json = lambda result=provider_spec: result
-  return providers
+if 'include' in provider_spec:
+  if len(SafeLineLoader.strip_metadata(provider_spec)) != 1:
+raise ValueError(
+f"When using include, it must be the only parameter: "
+f"{provider_spec} "
+f"at line {{SafeLineLoader.get_line(provider_spec)}}")
+  include_uri = provider_spec['include']
+  try:
+with urllib.request.urlopen(include_uri) as response:
+  content = response.read()
+  except (ValueError, urllib.error.URLError) as exn:
+if 'unknown url type' in str(exn):
+  with FileSystems.open(include_uri) as fin:
+content = fin.read()
+else:
+  rais

(beam) branch master updated (eaea331981c -> 0a392e9c238)

2024-06-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from eaea331981c Add counters for bytes read/written per gcs bucket (#31498)
 add 0a392e9c238 Feature add metrics counting the number of bytes 
read/written from/to GCS bucket per job (#31466)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/gcp/gcsio.py| 55 ---
 sdks/python/apache_beam/io/gcp/gcsio_test.py   | 62 +-
 .../python/apache_beam/options/pipeline_options.py | 13 +
 .../runners/dataflow/internal/apiclient.py |  4 ++
 4 files changed, 125 insertions(+), 9 deletions(-)



(beam) branch master updated (8e308cd666f -> eaea331981c)

2024-06-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 8e308cd666f Merge pull request #31643: Use correct name for tox doc 
task.
 add eaea331981c Add counters for bytes read/written per gcs bucket (#31498)

No new revisions were added by this update.

Summary of changes:
 .../sdk/extensions/gcp/options/GcsOptions.java |  30 +
 .../beam/sdk/extensions/gcp/util/GcsUtil.java  | 109 +++--
 .../util/channels/CountingReadableByteChannel.java |  63 ++
 .../util/channels/CountingSeekableByteChannel.java | 116 ++
 .../util/channels/CountingWritableByteChannel.java |  60 ++
 .../gcp/util/channels}/package-info.java   |   4 +-
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  | 131 -
 .../CountingChannelsIsOpenCloseMethodsTest.java| 105 +
 .../channels/CountingChannelsReadMethodsTest.java  | 122 +++
 .../channels/CountingChannelsWriteMethodsTest.java | 118 +++
 .../channels/CountingSeekableByteChannelTest.java  |  68 +++
 11 files changed, 912 insertions(+), 14 deletions(-)
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingReadableByteChannel.java
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingSeekableByteChannel.java
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingWritableByteChannel.java
 copy 
{.test-infra/pipelines/src/main/java/org/apache/beam/testinfra/pipelines/bigquery
 => 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/channels}/package-info.java
 (87%)
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingChannelsIsOpenCloseMethodsTest.java
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingChannelsReadMethodsTest.java
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingChannelsWriteMethodsTest.java
 create mode 100644 
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/channels/CountingSeekableByteChannelTest.java



(beam) branch master updated (5dd2d3f5edb -> c2207d82af2)

2024-06-11 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 5dd2d3f5edb [YAML] Add MLTransform. (#30002)
 add c2207d82af2 [YAML] Allow constants and simple comparisons in generic 
expressions. (#31455)

No new revisions were added by this update.

Summary of changes:
 .../JavaMapToFieldsTransformProvider.java  |   7 --
 .../apache_beam/yaml/standard_providers.yaml   |   5 +
 .../yaml/tests/{parquet.yaml => map.yaml}  |  33 +++---
 sdks/python/apache_beam/yaml/yaml_mapping.py   | 117 +
 sdks/python/apache_beam/yaml/yaml_transform.py |   7 ++
 .../site/content/en/documentation/sdks/yaml-udf.md |  27 -
 6 files changed, 148 insertions(+), 48 deletions(-)
 copy sdks/python/apache_beam/yaml/tests/{parquet.yaml => map.yaml} (65%)



(beam) branch master updated: [YAML] Add MLTransform. (#30002)

2024-06-11 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5dd2d3f5edb [YAML] Add MLTransform. (#30002)
5dd2d3f5edb is described below

commit 5dd2d3f5edb1e67583badb59a55eac3b6fe20bd2
Author: Robert Bradshaw 
AuthorDate: Tue Jun 11 13:23:15 2024 -0700

[YAML] Add MLTransform. (#30002)
---
 sdks/python/apache_beam/ml/transforms/handlers.py  | 18 +++--
 sdks/python/apache_beam/ml/transforms/tft.py   |  6 +-
 .../apache_beam/yaml/standard_providers.yaml   |  5 ++
 sdks/python/apache_beam/yaml/yaml_ml.py| 66 
 sdks/python/apache_beam/yaml/yaml_ml_test.py   | 92 ++
 5 files changed, 178 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/ml/transforms/handlers.py 
b/sdks/python/apache_beam/ml/transforms/handlers.py
index 5bcd0d16576..7a912f2d88e 100644
--- a/sdks/python/apache_beam/ml/transforms/handlers.py
+++ b/sdks/python/apache_beam/ml/transforms/handlers.py
@@ -147,12 +147,7 @@ class _ConvertNamedTupleToDict(
 Returns:
   A PCollection of dictionaries.
 """
-if isinstance(pcoll.element_type, RowTypeConstraint):
-  # Row instance
-  return pcoll | beam.Map(lambda x: x.as_dict())
-else:
-  # named tuple
-  return pcoll | beam.Map(lambda x: x._asdict())
+return pcoll | beam.Map(lambda x: x._asdict())
 
 
 class TFTProcessHandler(ProcessHandler[tft_process_handler_input_type,
@@ -404,6 +399,17 @@ class 
TFTProcessHandler(ProcessHandler[tft_process_handler_input_type,
   raw_data_metadata = metadata_io.read_metadata(
   os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))
 
+  element_type = raw_data.element_type
+  if (isinstance(element_type, RowTypeConstraint) or
+  native_type_compatibility.match_is_named_tuple(element_type)):
+# convert Row or NamedTuple to Dict
+column_type_mapping = self._map_column_names_to_types(
+row_type=element_type)
+raw_data = (
+raw_data
+| _ConvertNamedTupleToDict().with_output_types(
+Dict[str, typing.Union[tuple(column_type_mapping.values())]])) 
 # type: ignore
+
 feature_set = [feature.name for feature in 
raw_data_metadata.schema.feature]
 
 # TFT ignores columns in the input data that aren't explicitly defined
diff --git a/sdks/python/apache_beam/ml/transforms/tft.py 
b/sdks/python/apache_beam/ml/transforms/tft.py
index e2f02971e7c..e03d52214c9 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -464,8 +464,8 @@ class TFIDF(TFTOperation):
 This function applies a tf-idf transformation on the given columns
 of incoming data.
 
-TFIDF outputs two artifacts for each column: the vocabu index and
-the tfidf weight. The vocabu index is a mapping from the original
+TFIDF outputs two artifacts for each column: the vocabulary index and
+the tfidf weight. The vocabulary index is a mapping from the original
 vocabulary to the new vocabulary. The tfidf weight is a mapping
 from the original vocabulary to the tfidf score.
 
@@ -636,7 +636,7 @@ class BagOfWords(TFTOperation):
   compute_word_count: A boolean that specifies whether to compute
 the unique word count over the entire dataset. Defaults to False.
   key_vocab_filename: The file name for the key vocabulary file when
-compute_word_count is True. If empty, a file name 
+compute_word_count is True. If empty, a file name
 will be chosen based on the current scope. If provided, the vocab
 file will be suffixed with the column name.
   name: A name for the operation (optional).
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml 
b/sdks/python/apache_beam/yaml/standard_providers.yaml
index 8d0037d4dd9..e666513094f 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -52,6 +52,11 @@
  Flatten: 'beam:schematransform:org.apache.beam:yaml:flatten:v1'
  LogForTesting: 
'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1'
 
+- type: 'python'
+  config: {}
+  transforms:
+MLTransform: 'apache_beam.yaml.yaml_ml.ml_transform'
+
 - type: renaming
   transforms:
 'MapToFields-java': 'MapToFields-java'
diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py 
b/sdks/python/apache_beam/yaml/yaml_ml.py
new file mode 100644
index 000..33f2eeefd29
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/yaml_ml.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for add

(beam) 01/01: Merge pull request #31537 Add documentation and types to ReadFrom/WriteToBigQuery.

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 814cc8d3785c40e232dd12aef1c8eedac5998232
Merge: ab94c8fc633 3267f7c8a5d
Author: Robert Bradshaw 
AuthorDate: Mon Jun 10 14:44:05 2024 -0700

Merge pull request #31537 Add documentation and types to 
ReadFrom/WriteToBigQuery.

 sdks/python/apache_beam/yaml/yaml_io.py | 58 ++---
 1 file changed, 53 insertions(+), 5 deletions(-)



(beam) branch master updated (ab94c8fc633 -> 814cc8d3785)

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ab94c8fc633 Merge pull request #31549 Allow flags to be used as jinja 
template variables.
 add acb763a7ac8 Add types to ReadFrom/WriteToBigQuery.
 add 0eb8993a19a Full docstrings.
 add 3267f7c8a5d Better use of Optional typing.
 new 814cc8d3785 Merge pull request #31537 Add documentation and types to 
ReadFrom/WriteToBigQuery.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/yaml_io.py | 58 ++---
 1 file changed, 53 insertions(+), 5 deletions(-)



(beam) 01/01: Merge pull request #31549 Allow flags to be used as jinja template variables.

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ab94c8fc6331f92d43d0af1e83f6e9e3492b4ff7
Merge: d560aa63d58 b4c83a69cef
Author: Robert Bradshaw 
AuthorDate: Mon Jun 10 14:43:44 2024 -0700

Merge pull request #31549 Allow flags to be used as jinja template 
variables.

 sdks/python/apache_beam/yaml/main.py  | 44 +--
 sdks/python/apache_beam/yaml/main_test.py | 30 +
 2 files changed, 72 insertions(+), 2 deletions(-)



(beam) branch master updated (d560aa63d58 -> ab94c8fc633)

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d560aa63d58 make the table name unique in ReadUsingStorageApiTests 
(#31552)
 add a8205d789a8 Allow flags to be used as jinja template variables.
 add b4c83a69cef Less confusing name for argument parser.
 new ab94c8fc633 Merge pull request #31549 Allow flags to be used as jinja 
template variables.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/main.py  | 44 +--
 sdks/python/apache_beam/yaml/main_test.py | 30 +
 2 files changed, 72 insertions(+), 2 deletions(-)



(beam) branch master updated (d01c24ce64f -> e26eb125f9f)

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d01c24ce64f Update aws testcontainer to make it compatible with 
testcontainers_version = "1.19.7" (#31531)
 add aed85746d63 Remove pipeline options override from inside connector
 new e26eb125f9f Merge pull request #31412 Remove SpannerIO pipeline 
options override

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 5 -
 1 file changed, 5 deletions(-)



(beam) 01/01: Merge pull request #31412 Remove SpannerIO pipeline options override

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e26eb125f9fabfe02de23ba861219520f2f7ecf2
Merge: d01c24ce64f aed85746d63
Author: Robert Bradshaw 
AuthorDate: Mon Jun 10 09:19:31 2024 -0700

Merge pull request #31412 Remove SpannerIO pipeline options override

Remove pipeline options override from SpannerIO readChangeStream connector

 .../src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 5 -
 1 file changed, 5 deletions(-)




(beam) branch master updated (9e422ca1b0f -> eda6a39558a)

2024-06-10 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 9e422ca1b0f Merge pull request #31555: Fix usage of two different 
StateCache in StreamingDataflowWorker
 add eda6a39558a Allow context managers to be used for setup/teardown. 
(#31458)

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  4 ++
 .../{map_side_inputs_iter.py => map_context.py}| 53 +++-
 .../snippets/transforms/elementwise/map_test.py| 17 +
 sdks/python/apache_beam/pipeline_test.py   | 29 +
 sdks/python/apache_beam/runners/common.pxd |  2 +
 sdks/python/apache_beam/runners/common.py  | 63 ++-
 sdks/python/apache_beam/transforms/core.py | 72 ++
 sdks/python/scripts/generate_pydoc.sh  |  2 +
 .../transforms/python/elementwise/map.md   | 10 +++
 .../transforms/python/elementwise/pardo.md |  6 +-
 10 files changed, 238 insertions(+), 20 deletions(-)
 copy 
sdks/python/apache_beam/examples/snippets/transforms/elementwise/{map_side_inputs_iter.py
 => map_context.py} (53%)



(beam) branch master updated (4098fce785c -> f98a42a3205)

2024-06-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 4098fce785c fix managed doc (#31517)
 add 317b7c67760 Fix issue where manual subsequent label designations were 
ignored.
 new f98a42a3205 Merge pull request #31522 Fix issue where manual 
subsequent label designations were ignored.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/transforms/ptransform.py | 3 +++
 1 file changed, 3 insertions(+)



(beam) 01/01: Merge pull request #31522 Fix issue where manual subsequent label designations were ignored.

2024-06-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f98a42a3205085ba954eb0bf0685ff06b28e4a0c
Merge: 4098fce785c 317b7c67760
Author: Robert Bradshaw 
AuthorDate: Wed Jun 5 16:12:29 2024 -0700

Merge pull request #31522 Fix issue where manual subsequent label 
designations were ignored.

 sdks/python/apache_beam/transforms/ptransform.py | 3 +++
 1 file changed, 3 insertions(+)



(beam) 01/01: Merge pull request #31446 [YAML] mark Combine transform as stable

2024-06-04 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8fb9dc8c2101e22dacc228dc09cd7247bcc2bcbb
Merge: e31e8855ad9 7b6554bb6e6
Author: Robert Bradshaw 
AuthorDate: Tue Jun 4 12:17:32 2024 -0700

Merge pull request #31446 [YAML] mark Combine transform as stable

 examples/notebooks/get-started/try-apache-beam-yaml.ipynb| 2 +-
 sdks/python/apache_beam/yaml/examples/simple_filter_and_combine.yaml | 3 ---
 sdks/python/apache_beam/yaml/integration_tests.py| 1 -
 website/www/site/content/en/documentation/sdks/yaml-combine.md   | 2 +-
 4 files changed, 2 insertions(+), 6 deletions(-)




(beam) branch master updated (e31e8855ad9 -> 8fb9dc8c210)

2024-06-04 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e31e8855ad9 [#27839][Go SDK] Write pipeline options to a file, instead 
reading from a flag. (#31482)
 add 7b6554bb6e6 [YAML] mark Combine transform as stable
 new 8fb9dc8c210 Merge pull request #31446 [YAML] mark Combine transform as 
stable

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/notebooks/get-started/try-apache-beam-yaml.ipynb| 2 +-
 sdks/python/apache_beam/yaml/examples/simple_filter_and_combine.yaml | 3 ---
 sdks/python/apache_beam/yaml/integration_tests.py| 1 -
 website/www/site/content/en/documentation/sdks/yaml-combine.md   | 2 +-
 4 files changed, 2 insertions(+), 6 deletions(-)



(beam) 01/01: Merge pull request #31480 Fix mangled license headers.

2024-06-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f98b6617b1b28d1461d471391f3fc79403e1031a
Merge: 4ad7037b55f b680fd58ffe
Author: Robert Bradshaw 
AuthorDate: Mon Jun 3 17:05:12 2024 -0700

Merge pull request #31480 Fix mangled license headers.

 sdks/python/apache_beam/yaml/examples/simple_filter.yaml | 4 ++--
 sdks/python/apache_beam/yaml/examples/simple_filter_and_combine.yaml | 4 ++--
 sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/avro.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/bigquery.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/csv.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/java-map.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/join.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/json.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/parquet.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/sql.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/text.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/windowing.yaml| 4 ++--
 13 files changed, 26 insertions(+), 26 deletions(-)




(beam) branch master updated (4ad7037b55f -> f98b6617b1b)

2024-06-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 4ad7037b55f Parse YAML ExpansionService configs directly using 
SnakeYAML (#31406)
 add b680fd58ffe Fix mangled license headers.
 new f98b6617b1b Merge pull request #31480 Fix mangled license headers.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/examples/simple_filter.yaml | 4 ++--
 sdks/python/apache_beam/yaml/examples/simple_filter_and_combine.yaml | 4 ++--
 sdks/python/apache_beam/yaml/examples/wordcount_minimal.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/avro.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/bigquery.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/csv.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/java-map.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/join.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/json.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/parquet.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/sql.yaml  | 4 ++--
 sdks/python/apache_beam/yaml/tests/text.yaml | 4 ++--
 sdks/python/apache_beam/yaml/tests/windowing.yaml| 4 ++--
 13 files changed, 26 insertions(+), 26 deletions(-)



(beam) branch master updated (fbe9427b0a9 -> 64d9794f444)

2024-06-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from fbe9427b0a9 Add Trigger_Never to Prepare features check (#31472)
 add 44550eea721 Add test of Select type inference.
 add 5914f272e01 yapf
 add 64d9794f444 Merge pull request #31428 Add test of Select type 
inference.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/transforms/ptransform_test.py | 11 +++
 1 file changed, 11 insertions(+)



(beam) branch master updated (64d9794f444 -> 6dd32a0aa65)

2024-06-03 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 64d9794f444 Merge pull request #31428 Add test of Select type 
inference.
 add f54fda9df73 Add a test for sql dialect.
 add 59b0fdc9a3c Merge branch 'master' into yaml-sql-dialect
 add 6dd32a0aa65 Merge pull request #31276 Add a test for sql dialect.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/yaml/tests/sql.yaml | 19 +++
 1 file changed, 19 insertions(+)



(beam) branch master updated: Merge pull request #31449 Pass through docs (and configs) for SqlProviders.

2024-05-31 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new daf81143388 Merge pull request #31449 Pass through docs (and configs) 
for SqlProviders.
daf81143388 is described below

commit daf811433886a5efc2df8f1616aefa9c42706c93
Author: Robert Bradshaw 
AuthorDate: Fri May 31 09:39:07 2024 -0700

Merge pull request #31449 Pass through docs (and configs) for SqlProviders.
---
 sdks/python/apache_beam/yaml/yaml_join.py | 45 +--
 sdks/python/apache_beam/yaml/yaml_provider.py | 27 +---
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_join.py 
b/sdks/python/apache_beam/yaml/yaml_join.py
index 0b060b6a0ca..04a24642c23 100644
--- a/sdks/python/apache_beam/yaml/yaml_join.py
+++ b/sdks/python/apache_beam/yaml/yaml_join.py
@@ -178,26 +178,51 @@ def _SqlJoinTransform(
 fields: Optional[Dict[str, Any]] = None):
   """Joins two or more inputs using a specified condition.
 
+  For example::
+
+  type: Join
+  input:
+input1: SomeTransform
+input2: AnotherTransform
+input3: YetAnotherTransform
+  config:
+type: inner
+equalities:
+  - input1: colA
+input2: colB
+  - input2: colX
+input3: colY
+fields:
+  input1: [colA, colB, colC]
+  input2: {new_name: colB}
+
+  would perform an inner join on the three inputs satisfying the constraints
+  that `input1.colA = input2.colB` and `input2.colX = input3.colY`
+  emitting rows with `colA`, `colB` and `colC` from `input1`, the values of
+  `input2.colB` as a field called `new_name`, and all the fields from `input3`.
+
   Args:
 type: The type of join. Could be a string value in
 ["inner", "left", "right", "outer"] that specifies the type of join to
 be performed. For scenarios with multiple inputs to join where 
different
 join types are desired, specify the inputs to be outer joined. For
-example, {outer: [input1, input2]} means that input1 & input2 will be
-outer joined using the conditions specified, while other inputs will be
-inner joined.
+example, ``{outer: [input1, input2]}`` means that `input1` and `input2`
+will be outer joined using the conditions specified, while other inputs
+will be inner joined.
 equalities: The condition to join on. A list of sets of columns that should
-be equal to fulfill the join condition. For the simple scenario to join
-on the same column across all inputs and the column name is the same,
-specify the column name as a str.
+be equal to fulfill the join condition. For the simple scenario of
+joining on the same column across all inputs where the column name is
+the same, one can specify the column name as the equality rather than
+having to list it for every input.
 fields: The fields to be outputted. A mapping with the input alias as the
-key and the fields in the input to be outputted. The value in the map
+key and the list of fields in the input to be outputted.
+The value in the map
 can either be a dictionary with the new field name as the key and the
 original field name as the value (e.g new_field_name: field_name), or a
 list of the fields to be outputted with their original names
-(e.g [col1, col2, col3]), or an '*' indicating all fields in the input
-will be outputted. If not specified, all fields from all inputs will be
-outputted.
+(e.g ``[col1, col2, col3]``), or an '*' indicating all fields in the
+input will be outputted. If not specified, all fields from all inputs
+will be outputted.
   """
 
   _validate_input(pcolls)
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 52452daff7e..d5f6d03c284 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -424,7 +424,10 @@ class InlineProvider(Provider):
 return self._transform_factories.keys()
 
   def config_schema(self, typ):
-factory = self._transform_factories[typ]
+return self.config_schema_from_callable(self._transform_factories[typ])
+
+  @classmethod
+  def config_schema_from_callable(cls, factory):
 if isinstance(factory, type) and issubclass(factory, beam.PTransform):
   # https://bugs.python.org/issue40897
   params = dict(inspect.signature(factory.__init__).parameters)
@@ -442,7 +445,7 @@ class InlineProvider(Provider):
 
 docs = {
 param.arg_name: param.description
-   

(beam) branch master updated: Add docs for YAML AssertThat. (#31448)

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 80d85aa38ff Add docs for YAML AssertThat. (#31448)
80d85aa38ff is described below

commit 80d85aa38ff91699a5123f14d5c5df96d826140c
Author: Robert Bradshaw 
AuthorDate: Wed May 29 15:59:26 2024 -0700

Add docs for YAML AssertThat. (#31448)

This is the first transform in the (alphabetical) list, so it'd
be good to not have it empty.

Also produce slightly nicer examples for repeated arguments.
---
 sdks/python/apache_beam/yaml/generate_yaml_docs.py | 24 +++--
 sdks/python/apache_beam/yaml/yaml_provider.py  | 25 +-
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py 
b/sdks/python/apache_beam/yaml/generate_yaml_docs.py
index b11062cce4d..4719bc3e66a 100644
--- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py
+++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py
@@ -28,6 +28,18 @@ from apache_beam.yaml import json_utils
 from apache_beam.yaml import yaml_provider
 
 
+def _singular(name):
+  # Simply removing an 's' (or 'es', or 'ies', ...) may result in surprising
+  # manglings. Better to play it safe and leave a correctly-spelled plural
+  # than a botched singular in our examples configs.
+  return {
+  'args': 'arg',
+  'attributes': 'attribute',
+  'elements': 'element',
+  'fields': 'field',
+  }.get(name, name)
+
+
 def _fake_value(name, beam_type):
   type_info = beam_type.WhichOneof("type_info")
   if type_info == "atomic_type":
@@ -38,9 +50,17 @@ def _fake_value(name, beam_type):
 else:
   return name
   elif type_info == "array_type":
-return [_fake_value(name, beam_type.array_type.element_type), '...']
+return [
+_fake_value(_singular(name), beam_type.array_type.element_type),
+_fake_value(_singular(name), beam_type.array_type.element_type),
+'...'
+]
   elif type_info == "iterable_type":
-return [_fake_value(name, beam_type.iterable_type.element_type), '...']
+return [
+_fake_value(_singular(name), beam_type.iterable_type.element_type),
+_fake_value(_singular(name), beam_type.iterable_type.element_type),
+'...'
+]
   elif type_info == "map_type":
 if beam_type.map_type.key_type.atomic_type == schema_pb2.STRING:
   return {
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 5f53302028c..52452daff7e 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -557,7 +557,30 @@ def dicts_to_rows(o):
 
 class YamlProviders:
   class AssertEqual(beam.PTransform):
-def __init__(self, elements):
+"""Asserts that the input contains exactly the elements provided.
+
+This is primarily used for testing; it will cause the entire pipeline to
+fail if the input to this transform is not exactly the set of `elements`
+given in the config parameter.
+
+As with Create, YAML/JSON-style mappings are interpreted as Beam rows,
+e.g.::
+
+type: AssertEqual
+input: SomeTransform
+config:
+  elements:
+ - {a: 0, b: "foo"}
+ - {a: 1, b: "bar"}
+
+would ensure that `SomeTransform` produced exactly two elements with values
+`(a=0, b="foo")` and `(a=1, b="bar")` respectively.
+
+Args:
+elements: The set of elements that should belong to the PCollection.
+YAML/JSON-style mappings will be interpreted as Beam rows.
+"""
+def __init__(self, elements: Iterable[Any]):
   self._elements = elements
 
 def expand(self, pcoll):



(beam) branch master updated (0b5ffd7d153 -> 19630e576fe)

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0b5ffd7d153 Add SDK capability to detect if the SDK Fn Harness data 
channel is busy or not (#31420)
 add 19630e576fe Add in-memory variants of side inputs. (#31232)

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |   1 +
 .../java/org/apache/beam/sdk/transforms/View.java  | 198 -
 .../apache/beam/sdk/values/PCollectionViews.java   | 488 +
 .../org/apache/beam/sdk/transforms/ViewTest.java   | 145 ++
 4 files changed, 812 insertions(+), 20 deletions(-)



(beam) branch master updated (8d77c8fad07 -> 0b5ffd7d153)

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 8d77c8fad07 Add try-excepts around data sampler encoding (#31396)
 add 0b5ffd7d153 Add SDK capability to detect if the SDK Fn Harness data 
channel is busy or not (#31420)

No new revisions were added by this update.

Summary of changes:
 sdks/go/pkg/beam/core/runtime/exec/datasource.go  | 15 +--
 sdks/go/pkg/beam/core/runtime/exec/datasource_test.go |  8 ++--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go | 12 +++-
 sdks/go/pkg/beam/core/runtime/harness/harness.go  |  9 +
 sdks/go/pkg/beam/core/runtime/harness/monitoring.go   |  8 
 .../beam/sdk/fn/data/BeamFnDataInboundObserver.java   | 14 ++
 .../apache/beam/sdk/util/construction/Environments.java   |  1 +
 .../beam/fn/harness/control/ProcessBundleHandler.java |  3 +++
 .../python/apache_beam/runners/worker/bundle_processor.py |  9 +
 sdks/python/apache_beam/runners/worker/sdk_worker.py  |  5 -
 sdks/python/apache_beam/runners/worker/sdk_worker_test.py |  4 ++--
 sdks/python/apache_beam/transforms/environments.py|  1 +
 12 files changed, 69 insertions(+), 20 deletions(-)



(beam) branch master updated (b1a6eb06051 -> 8b33e1f65c3)

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from b1a6eb06051 [YAML] Fix simple YAML mappings type hinting (#31427)
 add 6842136e0c9 Add SDK capability to detect if the SDK Fn Harness data 
channel is busy.
 add ad841c6004f Regenerate Go protos.
 new 8b33e1f65c3 Merge pull request #31442 SDK protocol to detect if the 
SDK Fn Harness data channel is busy

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/model/fn_execution/v1/beam_fn_api.proto   |5 +
 .../beam/model/pipeline/v1/beam_runner_api.proto   |5 +
 .../beam/model/fnexecution_v1/beam_fn_api.pb.go| 1582 ++
 .../beam/model/pipeline_v1/beam_runner_api.pb.go   | 3245 ++--
 .../model/pipeline_v1/external_transforms.pb.go|4 +
 5 files changed, 2592 insertions(+), 2249 deletions(-)



(beam) 01/01: Merge pull request #31442 SDK protocol to detect if the SDK Fn Harness data channel is busy

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8b33e1f65c38d7650118858d632d393bda21329a
Merge: b1a6eb06051 ad841c6004f
Author: Robert Bradshaw 
AuthorDate: Wed May 29 10:28:36 2024 -0700

Merge pull request #31442 SDK protocol to detect if the SDK Fn Harness data 
channel is busy

 .../beam/model/fn_execution/v1/beam_fn_api.proto   |5 +
 .../beam/model/pipeline/v1/beam_runner_api.proto   |5 +
 .../beam/model/fnexecution_v1/beam_fn_api.pb.go| 1582 ++
 .../beam/model/pipeline_v1/beam_runner_api.pb.go   | 3245 ++--
 .../model/pipeline_v1/external_transforms.pb.go|4 +
 5 files changed, 2592 insertions(+), 2249 deletions(-)



(beam) branch master updated (49a4290426d -> b1a6eb06051)

2024-05-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 49a4290426d Add options to specify read and write http timeout for gcs 
as well as lower batching limit for rewrite operations which are copying. 
(#31410)
 add b1a6eb06051 [YAML] Fix simple YAML mappings type hinting (#31427)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/yaml/yaml_combine.py  |  2 +-
 sdks/python/apache_beam/yaml/yaml_mapping.py  | 23 ++---
 sdks/python/apache_beam/yaml/yaml_mapping_test.py | 30 +++
 3 files changed, 50 insertions(+), 5 deletions(-)



(beam) branch master updated: Remove bad dialect option. (#31429)

2024-05-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 28443f803c8 Remove bad dialect option. (#31429)
28443f803c8 is described below

commit 28443f803c8003b7bb466d9d66045df18cefe504
Author: Robert Bradshaw 
AuthorDate: Tue May 28 16:13:58 2024 -0700

Remove bad dialect option. (#31429)
---
 sdks/python/apache_beam/yaml/tests/sql.yaml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml 
b/sdks/python/apache_beam/yaml/tests/sql.yaml
index 34a6b80775a..87b84a71aae 100644
--- a/sdks/python/apache_beam/yaml/tests/sql.yaml
+++ b/sdks/python/apache_beam/yaml/tests/sql.yaml
@@ -49,7 +49,6 @@ pipelines:
 
 - type: Sql
   config:
-dialect: X
 query:
   "SELECT a, sum(b) as s FROM PCOLLECTION GROUP BY a"
 



(beam) branch master updated (e488f41b9bd -> 305e75359ad)

2024-05-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e488f41b9bd Memoize some dataframes analysis operations. (#31377)
 add 305e75359ad Better mixed Java/Python error messages for external 
transforms. (#31284)

No new revisions were added by this update.

Summary of changes:
 .../sdk/expansion/service/ExpansionService.java|  7 +++--
 sdks/python/apache_beam/transforms/external.py | 32 +-
 .../python/apache_beam/transforms/external_test.py | 30 
 sdks/python/apache_beam/yaml/tests/sql.yaml|  1 +
 4 files changed, 67 insertions(+), 3 deletions(-)



(beam) branch master updated (faaa68c1a1c -> 6197657c9c3)

2024-05-08 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from faaa68c1a1c Fix lint warning: JobId vs JobID (#31197)
 add ace86e2d518 [YAML] Add the ability to pre-process yaml files with 
jinja2.
 add 95ac1306a56 Allow including from arbitrary Beam filesystems.
 add 6bcf02b9218 Add tests for yaml main.
 add 13c72e82abe Unconditionally invoke jinja and update release notes.
 add 2e662d54347 Make test infra happy.
 add 53e85aeb186 Use Sphinx-compatible jinja2.
 add 94a6bbb40a8 Remove flags_as_jinja_variables.
 add 2dcb12f4f3f Actually fix RowCoder tests.
 add 5e058620c95 Also record template and jinja variables in display data.
 add 037704b6f7d Merge branch 'master' into yaml-jinja
 add 6197657c9c3 Merge pull request #30976 [YAML] Add the ability to 
pre-process yaml files with jinja2.

No new revisions were added by this update.

Summary of changes:
 CHANGES.md   |  2 +
 sdks/python/apache_beam/typehints/schemas.py |  7 +++
 sdks/python/apache_beam/yaml/main.py | 64 +--
 sdks/python/apache_beam/yaml/main_test.py| 76 
 sdks/python/scripts/run_pylint.sh|  2 +
 sdks/python/setup.py |  1 +
 6 files changed, 137 insertions(+), 15 deletions(-)
 create mode 100644 sdks/python/apache_beam/yaml/main_test.py



(beam) branch master updated (ebb1465942e -> c0c8d7852a2)

2024-05-07 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ebb1465942e Add jinja2 as a dependency for yaml preprocessing. (#31164)
 add c0c8d7852a2 Restore pandas version constraint. (#31213)

No new revisions were added by this update.

Summary of changes:
 sdks/python/setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(beam) branch master updated (e2e0ba67e27 -> ebb1465942e)

2024-05-07 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e2e0ba67e27 Merge pull request #31208: Updates the error message 
related to schema-transform discovery
 add ebb1465942e Add jinja2 as a dependency for yaml preprocessing. (#31164)

No new revisions were added by this update.

Summary of changes:
 sdks/python/setup.py | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)



svn commit: r68923 - in /release/beam: 2.55.0/ 2.55.1/ 2.56.0/ 2.56.0/python/

2024-05-02 Thread robertwb
Author: robertwb
Date: Thu May  2 16:31:30 2024
New Revision: 68923

Log:
Adding artifacts for the 2.56.0 release and removing old artifacts

Added:
release/beam/2.56.0/
release/beam/2.56.0/apache-beam-2.56.0-source-release.zip   (with props)
release/beam/2.56.0/apache-beam-2.56.0-source-release.zip.asc
release/beam/2.56.0/apache-beam-2.56.0-source-release.zip.sha512
release/beam/2.56.0/python/

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-macosx_10_9_x86_64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-macosx_10_9_x86_64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-macosx_10_9_x86_64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.sha512
release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win32.whl   (with 
props)
release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win32.whl.asc
release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win32.whl.sha512
release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win_amd64.whl   
(with props)
release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win_amd64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp310-cp310-win_amd64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-macosx_10_9_x86_64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-macosx_10_9_x86_64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-macosx_10_9_x86_64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.sha512
release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win32.whl   (with 
props)
release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win32.whl.asc
release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win32.whl.sha512
release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win_amd64.whl   
(with props)
release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win_amd64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp311-cp311-win_amd64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-macosx_10_9_x86_64.whl  
 (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-macosx_10_9_x86_64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-macosx_10_9_x86_64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.asc

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.sha512

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl
   (with props)

release/beam/2.56.0/python/apache_beam-2.56.0-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl.asc

release/beam

(beam) branch master updated (d2db322c081 -> 670e56fb1b4)

2024-05-01 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from d2db322c081 Fix ISM-expecting tests by explicilty requisting random 
access. (#31149)
 add 670e56fb1b4 FnAPI proto changes for ordered list state. (#31092)

No new revisions were added by this update.

Summary of changes:
 .../beam/model/fn_execution/v1/beam_fn_api.proto   | 37 ++
 1 file changed, 37 insertions(+)



(beam) branch master updated: Fix ISM-expecting tests by explicilty requisting random access. (#31149)

2024-05-01 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new d2db322c081 Fix ISM-expecting tests by explicilty requisting random 
access. (#31149)
d2db322c081 is described below

commit d2db322c081be810c0f756b6dbabc2208ffedb35
Author: Robert Bradshaw 
AuthorDate: Wed May 1 11:24:02 2024 -0700

Fix ISM-expecting tests by explicilty requisting random access. (#31149)
---
 .../beam/runners/dataflow/DataflowPipelineTranslatorTest.java   | 4 +++-
 .../apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java | 6 --
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 3b2c7aa0d8e..0f3451bbb78 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -948,7 +948,9 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 Pipeline pipeline = Pipeline.create(options);
 
 final PCollectionView> view =
-pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 
23)).apply(View.asList());
+pipeline
+.apply("CreateSideInput", Create.of(11, 13, 17, 23))
+.apply(View.asList().withRandomAccess());
 
 pipeline
 .apply("CreateMainInput", Create.of(29, 31))
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index 42bc9eb5389..0ce92f9d932 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -678,7 +678,9 @@ public class IsmSideInputReaderTest {
 KV.of(2L, valueInGlobalWindow(62L)));
 
 final PCollectionView> view =
-
Pipeline.create().apply(Create.empty(VarLongCoder.of())).apply(View.asList());
+Pipeline.create()
+.apply(Create.empty(VarLongCoder.of()))
+.apply(View.asList().withRandomAccess());
 
 Source sourceA = initInputFile(fromKvsForList(firstElements), ismCoder);
 Source sourceB = initInputFile(fromKvsForList(secondElements), ismCoder);
@@ -736,7 +738,7 @@ public class IsmSideInputReaderTest {
 Pipeline.create()
 .apply(Create.empty(VarLongCoder.of()))
 .apply(Window.into(FixedWindows.of(Duration.millis(10
-.apply(View.asList());
+.apply(View.asList().withRandomAccess());
 
 Source sourceA = initInputFile(fromKvsForList(concat(firstElements, 
secondElements)), ismCoder);
 Source sourceB = initInputFile(fromKvsForList(thirdElements), ismCoder);



(beam) branch master updated (44e2abea788 -> dc5841d0ed0)

2024-05-01 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 44e2abea788 Pin tensorflow version in notebook (#31145)
 add dc5841d0ed0 [YAML] Increase re-use of providers with implicitly 
overlapping transforms. (#30793)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/yaml/yaml_provider.py  |  61 ++-
 sdks/python/apache_beam/yaml/yaml_provider_test.py | 120 +
 sdks/python/apache_beam/yaml/yaml_transform.py |  50 -
 .../apache_beam/yaml/yaml_transform_scope_test.py  |   2 +-
 .../apache_beam/yaml/yaml_transform_unit_test.py   |   3 +-
 5 files changed, 228 insertions(+), 8 deletions(-)
 create mode 100644 sdks/python/apache_beam/yaml/yaml_provider_test.py



(beam) branch master updated: Optimise View.asList() side inputs for iterating rather than for indexing. (#31087)

2024-04-30 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f7bc3e9ce5 Optimise View.asList() side inputs for iterating rather 
than for indexing. (#31087)
7f7bc3e9ce5 is described below

commit 7f7bc3e9ce54dc6f732e20bef008ecdef571033e
Author: Robert Bradshaw 
AuthorDate: Tue Apr 30 13:58:03 2024 -0700

Optimise View.asList() side inputs for iterating rather than for indexing. 
(#31087)

The current implementation is, essentially, a distributed hashmap from
integer keys to the list contents, mediated by each upstream worker starting
at a random value to minimize overlaps and emitting sufficient metadata to 
map
this onto the contiguous range [0, N). This provides optimal *random-access*
performance, but very poor *iteration* performance (essentially having to do
a key lookup for every advance, and as the keys are hashed and distributed
rather than clustered numerically, there is little to no amortization in 
these
lookups for adjacent items.

Given that most uses for List side inputs are merely to gather a collection
of values (the user has no control over the ordering when materialized) and
the high costs of providing random access, this is probably the wrong 
tradeoff
for most pipelines.

This is an update-incompatible change and so has been guarded by the
update compatibility version flag. The old behavior can be explicitly
asked for via a new AsList#withRandomAccess() method.
---
 CHANGES.md |   5 +
 .../apache/beam/sdk/options/StreamingOptions.java  |  20 ++
 .../java/org/apache/beam/sdk/transforms/View.java  |  57 +-
 .../apache/beam/sdk/values/PCollectionViews.java   | 210 -
 .../org/apache/beam/sdk/transforms/ViewTest.java   |  33 
 .../CreatePCollectionViewTranslationTest.java  |   2 +-
 .../sdk/util/construction/MorePipelineTest.java|   2 +-
 7 files changed, 323 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ec7fbe45668..66e40126236 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -71,6 +71,11 @@
 ## Breaking Changes
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* Java's View.asList() side inputs are now optimized for iterating rather than
+  indexing when in the global window.
+  This new implementation still supports all (immutable) List methods as 
before,
+  but some of the random access methods like get() and size() will be slower.
+  To use the old implementation one can use View.asList().withRandomAccess().
 
 ## Deprecations
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
index e389ab89cf9..8065e0a40cb 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.options;
 
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Options used to configure streaming. */
@@ -41,4 +45,20 @@ public interface StreamingOptions extends 
ApplicationNameOptions, PipelineOption
   String getUpdateCompatibilityVersion();
 
   void setUpdateCompatibilityVersion(@Nullable String 
updateCompatibilityVersion);
+
+  static boolean updateCompatibilityVersionLessThan(PipelineOptions options, 
String version) {
+if (options == null) {
+  return false;
+}
+String updateCompatibilityVersion =
+options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+if (updateCompatibilityVersion == null) {
+  return false;
+}
+List requestedVersion = 
Arrays.asList(updateCompatibilityVersion.split("\\."));
+List targetVersion = Arrays.asList(version.split("\\."));
+return Comparators.lexicographical(Comparator.naturalOrder())
+.compare(requestedVersion, targetVersion)
+< 0;
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 22c42249678..ca04542b372 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -28,8 +28,10 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.optio

(beam) branch master updated (45e78572e8f -> 83a90f2bbb4)

2024-04-24 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 45e78572e8f python sdk: fix several bugs regarding avto <-> beam 
schema conversion (#30770)
 add 5cce8d79925 Add usage counters to ListView side inputs.
 new 83a90f2bbb4 Merge pull request #31083 Add usage counters to ListView 
side inputs.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/sdk/values/PCollectionViews.java   | 52 +++---
 1 file changed, 46 insertions(+), 6 deletions(-)



(beam) 01/01: Merge pull request #31083 Add usage counters to ListView side inputs.

2024-04-24 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 83a90f2bbb4ab21a468658e4182c024c4cc81da8
Merge: 45e78572e8f 5cce8d79925
Author: Robert Bradshaw 
AuthorDate: Wed Apr 24 09:10:47 2024 -0700

Merge pull request #31083 Add usage counters to ListView side inputs.

 .../apache/beam/sdk/values/PCollectionViews.java   | 52 +++---
 1 file changed, 46 insertions(+), 6 deletions(-)



(beam) branch master updated (fd1003034ff -> 98b1f03125c)

2024-04-19 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from fd1003034ff Add ScaleToGaussian TFTransform (#31026)
 add 98b1f03125c Better and earlier error for missing service loader 
metadata. (#31042)

No new revisions were added by this update.

Summary of changes:
 .../dataflow/worker/DataflowBatchWorkerHarness.java |  3 +++
 .../dataflow/worker/StreamingDataflowWorker.java|  3 +++
 .../src/main/java/org/apache/beam/sdk/Pipeline.java |  2 ++
 .../beam/sdk/util/construction/CoderTranslation.java| 17 +
 .../main/java/org/apache/beam/fn/harness/FnHarness.java |  2 ++
 5 files changed, 27 insertions(+)



(beam) branch master updated: Update documentation of @SchemaFieldNumber (#30273) (#30277)

2024-04-18 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 61153bbda6a Update documentation of @SchemaFieldNumber (#30273) 
(#30277)
61153bbda6a is described below

commit 61153bbda6abd5e2d6798544d5f5a81f08d15ee4
Author: bzablocki 
AuthorDate: Thu Apr 18 17:01:50 2024 +0200

Update documentation of @SchemaFieldNumber (#30273) (#30277)
---
 .../org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java
index 32110395f60..1bfcda7270b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java
@@ -31,16 +31,19 @@ import javax.annotation.Nonnull;
  * specified index. There cannot be "gaps" in field numbers, or schema 
inference will fail. If used,
  * all fields (or getters in the case of a bean) must be annotated.
  *
+ * The annotation takes a String as an argument, but this has to be an 
Integer-parsable String.
+ * Otherwise the pipeline will throw a RuntimeException.
+ *
  * For example, say we have a Java POJO with a field that we want in our 
schema but under a
  * different name:
  *
  * 
  *  {@literal @}DefaultSchema(JavaFieldSchema.class)
  *   class MyClass {
- * {@literal @}SchemaFieldNumber(1)
+ * {@literal @}SchemaFieldNumber("1")
  * public String user;
  *
- *{@literal @}SchemaFieldNumber(0)
+ *{@literal @}SchemaFieldNumber("0")
  * public int ageInYears;
  *   }
  * 



(beam) branch master updated: Add some metrics for CoGBK profiling. (#30979)

2024-04-16 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6bca71070e9 Add some metrics for CoGBK profiling. (#30979)
6bca71070e9 is described below

commit 6bca71070e96b56b781600e8833a72cea329b1a1
Author: Robert Bradshaw 
AuthorDate: Tue Apr 16 14:51:29 2024 -0700

Add some metrics for CoGBK profiling. (#30979)
---
 .../org/apache/beam/sdk/transforms/join/CoGbkResult.java  | 15 +++
 1 file changed, 15 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 8f7898fc428..2e26d13da54 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.util.common.Reiterator;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
@@ -73,6 +75,10 @@ public class CoGbkResult {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class);
 
+  private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys");
+
+  private Counter largeKeyCount = Metrics.counter(CoGbkResult.class, 
"cogbk-large-keys");
+
   /**
* A row in the {@link PCollection} resulting from a {@link CoGroupByKey} 
transform. Currently,
* this row must fit into memory.
@@ -91,6 +97,7 @@ public class CoGbkResult {
   int inMemoryElementCount,
   int minElementsPerTag) {
 this.schema = schema;
+keyCount.inc();
 List> valuesByTag = new ArrayList<>();
 for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
   valuesByTag.add(new ArrayList<>());
@@ -103,6 +110,7 @@ public class CoGbkResult {
 while (taggedIter.hasNext()) {
   if (elementCount++ >= inMemoryElementCount) {
 // Let the tails be lazy.
+largeKeyCount.inc();
 break;
   }
   RawUnionValue value = taggedIter.next();
@@ -636,6 +644,10 @@ public class CoGbkResult {
 }
 
 void finish() {
+  Metrics.counter(
+  CoGbkResult.class,
+  this.tail == null ? "cogbk-small-iterables" : 
"cogbk-large-iterables")
+  .inc();
   finished = true;
 }
 
@@ -838,8 +850,11 @@ public class CoGbkResult {
   // We got to the end of the iterable, update the shared set of values 
with those sets that
   // were small enough to cache.
   if (!sharedSeenEnd[0]) {
+Counter smallIterablesCount = Metrics.counter(CoGbkResult.class, 
"cogbk-small-iterables");
+Counter largeIterablesCount = Metrics.counter(CoGbkResult.class, 
"cogbk-large-iterables");
 for (int i = 0; i < sharedValueMap.size(); i++) {
   List localValues = localValueMap.get(i);
+  (localValues == null ? largeIterablesCount : 
smallIterablesCount).inc();
   sharedValueMap.set(
   i, localValues != null ? localValues : 
simpleFilteringIterable(taggedIterable, i));
 }



(beam) branch master updated (eb7ad46fcb7 -> 63ebda005d0)

2024-04-15 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from eb7ad46fcb7 remove CLOUDSDK env variable , not needed here (#30970)
 add fc7b955218c Reapply "[BEAM-30531] Automatically execute unbounded 
pipelines in streaming mode. (#30533)" (#30706)
 add 042284afd4e Add experiment for disabling auto-streaming.
 add c165f8af134 Correct release notes.
 add 63ebda005d0 Merge pull request #30959 Automatically execute unbounded 
pipelines in streaming mode.

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  1 +
 .../runners/dataflow/dataflow_runner.py| 27 ++
 .../runners/dataflow/dataflow_runner_test.py   | 61 ++
 3 files changed, 89 insertions(+)



(beam) branch master updated (dac2bd87b2c -> ce364e128b2)

2024-04-12 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from dac2bd87b2c Merge pull request #30880 [YAML] Adds several tests 
exercising the cross-language capabilities.
 add 179103f7a64 [BEAM-30950] Disable failing test for now.
 new ce364e128b2 Merge pull request #30951 [BEAM-30950] Disable failing 
test for now.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(beam) 01/01: Merge pull request #30951 [BEAM-30950] Disable failing test for now.

2024-04-12 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ce364e128b27433ea660f587d5d5e8e8834591b2
Merge: dac2bd87b2c 179103f7a64
Author: Robert Bradshaw 
AuthorDate: Fri Apr 12 08:36:38 2024 -0700

Merge pull request #30951 [BEAM-30950] Disable failing test for now.

 sdks/python/build.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(beam) branch master updated (5b3dbf849ce -> dac2bd87b2c)

2024-04-12 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 5b3dbf849ce Add important category (#30920)
 add 883c339372d Add a cross language yaml test.
 add b05a9c7593a Add parquet, avro, and text tests.
 add 0c85f0d5bbb Add java mapping test.
 add 7ed6b6d5b5a Add tests for windowing and combining in java and Python.
 add 99df52cffea Reword docstring.
 new dac2bd87b2c Merge pull request #30880 [YAML] Adds several tests 
exercising the cross-language capabilities.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/integration_tests.py  |   8 +-
 .../apache_beam/yaml/tests/{csv.yaml => avro.yaml} |   8 +-
 sdks/python/apache_beam/yaml/tests/java-map.yaml   |  74 +
 .../yaml/tests/{csv.yaml => parquet.yaml}  |   8 +-
 .../apache_beam/yaml/tests/{csv.yaml => sql.yaml}  |  58 ++
 .../apache_beam/yaml/tests/{csv.yaml => text.yaml} |  20 ++--
 sdks/python/apache_beam/yaml/tests/windowing.yaml  | 122 +
 sdks/python/apache_beam/yaml/yaml_combine.py   |   1 -
 sdks/python/apache_beam/yaml/yaml_provider.py  |   6 +-
 9 files changed, 257 insertions(+), 48 deletions(-)
 copy sdks/python/apache_beam/yaml/tests/{csv.yaml => avro.yaml} (91%)
 create mode 100644 sdks/python/apache_beam/yaml/tests/java-map.yaml
 copy sdks/python/apache_beam/yaml/tests/{csv.yaml => parquet.yaml} (90%)
 copy sdks/python/apache_beam/yaml/tests/{csv.yaml => sql.yaml} (50%)
 copy sdks/python/apache_beam/yaml/tests/{csv.yaml => text.yaml} (75%)
 create mode 100644 sdks/python/apache_beam/yaml/tests/windowing.yaml



(beam) 01/01: Merge pull request #30880 [YAML] Adds several tests exercising the cross-language capabilities.

2024-04-12 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit dac2bd87b2c497e7fc9cfce984463c0903a39c9e
Merge: 5b3dbf849ce 99df52cffea
Author: Robert Bradshaw 
AuthorDate: Fri Apr 12 08:35:49 2024 -0700

Merge pull request #30880 [YAML] Adds several tests exercising the 
cross-language capabilities.

 sdks/python/apache_beam/yaml/integration_tests.py |   8 +-
 sdks/python/apache_beam/yaml/tests/avro.yaml  |  47 +
 sdks/python/apache_beam/yaml/tests/java-map.yaml  |  74 +
 sdks/python/apache_beam/yaml/tests/parquet.yaml   |  47 +
 sdks/python/apache_beam/yaml/tests/sql.yaml   |  59 +++
 sdks/python/apache_beam/yaml/tests/text.yaml  |  47 +
 sdks/python/apache_beam/yaml/tests/windowing.yaml | 122 ++
 sdks/python/apache_beam/yaml/yaml_combine.py  |   1 -
 sdks/python/apache_beam/yaml/yaml_provider.py |   6 +-
 9 files changed, 404 insertions(+), 7 deletions(-)




(beam) branch master updated (701670624b3 -> 1dc3346705a)

2024-04-11 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 701670624b3 Remove comma that breaks formatting (#30931)
 add 1dc3346705a Add gradle target and github workflow for cross-langauge 
yaml tests. (#30874)

No new revisions were added by this update.

Summary of changes:
 ...ct.yml => beam_PreCommit_Yaml_Xlang_Direct.yml} | 31 +++---
 sdks/python/build.gradle   | 19 +
 2 files changed, 41 insertions(+), 9 deletions(-)
 copy .github/workflows/{beam_PostCommit_Python_Xlang_Gcp_Direct.yml => 
beam_PreCommit_Yaml_Xlang_Direct.yml} (78%)



(beam) branch master updated (f1a47efc642 -> 1dfd39bc6d0)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from f1a47efc642 Allow lazy iteration for non-reiterables. (#30851)
 add 1dfd39bc6d0 Fix merging with missing environments. (#30864)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/runners/common.py | 17 -
 1 file changed, 12 insertions(+), 5 deletions(-)



(beam) branch master updated (2e630ac3675 -> f1a47efc642)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2e630ac3675 Lower various logging statement levels to clean up example 
printing (#30782)
 add f1a47efc642 Allow lazy iteration for non-reiterables. (#30851)

No new revisions were added by this update.

Summary of changes:
 .../beam/sdk/transforms/join/CoGbkResult.java  | 256 +++--
 .../beam/sdk/transforms/join/CoGbkResultTest.java  |  89 ---
 2 files changed, 294 insertions(+), 51 deletions(-)



(beam) branch master updated (6f7c6875368 -> 55f5a43eb37)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6f7c6875368 Correct the version spec (#30856)
 add 55f5a43eb37 Remove outdated comment about docker. (#30871)

No new revisions were added by this update.

Summary of changes:
 website/www/site/content/en/documentation/sdks/yaml.md | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)



(beam) branch master updated (747d7c78a7a -> a475fdeeb55)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 747d7c78a7a Merge pull request #30842 Proposed edits for Beam YAML 
overview
 add ef3a942aa42 WiP: Day Zero Tutorial
 add 9a28286c5e7 WiP: Yaml Day Zero tutorial.
 add c89af58a39d Python Notebook for getting started with Yaml API
 add 814eda6b6f3 Small changes to Getting started with yaml notebook
 add 226c2532c21 Yaml notebooks: formatting
 add 1ec22aeefe6 Yaml notebooks: change second pipeline to chain
 add 789a2ce42fa Small content fixes in the getting started with yaml 
notebook
 add dd23eeaa3d8 Adding word count example to the yaml notebook.
 add e3667016565 Clear outputs of the notebook
 add 8bcb0aaa9f8 Apply PR comments. Add more information on structure of 
yaml pipelines
 add 3fb02dbe4d7 Switching from`gsutil cp` to `wget`. Dropping introduced 
SumPer key transform in favour of CombinePerKey with a callable argument
 add c863b676014 Fixing after the merge
 add 9c3adbae86c Editorial suggestions
 add aabea011ec8 Remove dash in the content of Getting Started with Yaml
 add c701db3fff6 Fix typos
 add 1f875af31af Modify Getting Started Notebook - add simpler examples
 add 34b6a0c426a WiP: Day Zero Tutorial
 add 6ac7336c498 WiP: Yaml Day Zero tutorial.
 add 141abb55665 Merge branch 'master' into yaml_day_zero_tutorial
 add aa4a0dee4c9 Python Notebook for getting started with Yaml API
 add 89582f2f83f Add SumPerKey and TopNLargest transforms to Yaml api
 add abc777f9776 Small changes to Getting started with yaml notebook
 add a39362cf14b Yaml notebooks: formatting
 add dee90b2a14a Yaml notebooks: change second pipeline to chain
 add a4a9cedc58f Small content fixes in the getting started with yaml 
notebook
 add a6b5c4eb963 Adding word count example to the yaml notebook.
 add fd00b1ff57d Clear outputs of the notebook
 add d13f4ed1483 Apply PR comments. Add more information on structure of 
yaml pipelines
 add bbf7f7d2060 Switching from`gsutil cp` to `wget`. Dropping introduced 
SumPer key transform in favour of CombinePerKey with a callable argument
 add 00ba6c64b9f Merge branch 'master' into yaml_day_zero_tutorial
 add 781ee6d77b7 Fixing after the merge
 add 615c746691d Editorial suggestions
 add 50c8170c4c7 Remove dash in the content of Getting Started with Yaml
 add 373b9e04f7c Fix typos
 add d4d2b4d76aa Merge remote-tracking branch 
'origin/yaml_day_zero_tutorial' into yaml_day_zero_tutorial
 add 048c9557db4 Merge branch 'master' into yaml_day_zero_tutorial
 add f20bfe6002f Merge remote-tracking branch 'upstream/master' into 
yaml_day_zero_tutorial
 add bb760acdcbf Review comments
 add 3242fcb6384 Review comments
 add ada5e62dfa5 Update 
examples/notebooks/get-started/try-apache-beam-yaml.ipynb
 add ccf2a5056a6 remove the last code block
 add a38fe29d184 Merge remote-tracking branch 'origin/master' into 
yaml_day_zero_tutorial
 add fad13cdcdc2 Add a link to documentation. Add an explanation on 
expansion service.
 add 239c0d507f5 typo
 add 176c835f964 Merge branch 'master' into yaml_day_zero_tutorial
 add ee6da7a1b33 Use %%writefile instead of custom python function
 add 60829dc8fc5 Install apache-beam command without --update
 add 3da0b066c4e Install apache-beam[yaml]
 add a475fdeeb55 Merge pull request #27284 Yaml API: Day Zero tutorial 
notebook

No new revisions were added by this update.

Summary of changes:
 .../get-started/try-apache-beam-yaml.ipynb | 734 +
 1 file changed, 734 insertions(+)
 create mode 100644 examples/notebooks/get-started/try-apache-beam-yaml.ipynb



(beam) branch master updated (040dba18f81 -> 747d7c78a7a)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 040dba18f81 Merge pull request #30862 Better handling of no-output 
transforms.
 add 358c88d2656 Proposed edits for Beam YAML overview
 new 747d7c78a7a Merge pull request #30842 Proposed edits for Beam YAML 
overview

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../www/site/content/en/documentation/sdks/yaml.md | 245 ++---
 1 file changed, 162 insertions(+), 83 deletions(-)



(beam) 01/01: Merge pull request #30842 Proposed edits for Beam YAML overview

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 747d7c78a7ae67d81fdb238124ce908e31d915e2
Merge: 040dba18f81 358c88d2656
Author: Robert Bradshaw 
AuthorDate: Fri Apr 5 09:50:00 2024 -0700

Merge pull request #30842 Proposed edits for Beam YAML overview

* Organize into several main sections
* Make the "Getting Started" section more procedural.
* Use a self-contained pipeline for Getting Started. (No input data files 
required)
* Add explanatory text to motivate the example YAMLs
* General style edits

 .../www/site/content/en/documentation/sdks/yaml.md | 245 ++---
 1 file changed, 162 insertions(+), 83 deletions(-)



(beam) 01/01: Merge pull request #30862 Better handling of no-output transforms.

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 040dba18f817779c946ce5de7ffc859937d054c7
Merge: ffc96bc2edf a841c55054f
Author: Robert Bradshaw 
AuthorDate: Fri Apr 5 09:09:56 2024 -0700

Merge pull request #30862 Better handling of no-output transforms.

 sdks/python/apache_beam/yaml/yaml_transform.py   | 15 +--
 sdks/python/apache_beam/yaml/yaml_transform_unit_test.py |  3 ++-
 2 files changed, 11 insertions(+), 7 deletions(-)



(beam) branch master updated (ffc96bc2edf -> 040dba18f81)

2024-04-05 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from ffc96bc2edf Bump cloud.google.com/go/bigquery from 1.59.1 to 1.60.0 in 
/sdks (#30837)
 add 9e98ada9ac6 [YAML] Interpret PDone as no outputs.
 add bd1945e3b8a Allow implicit n-arry outputs of chain composite and 
pipelines.
 add a841c55054f Fix change-detector test.
 new 040dba18f81 Merge pull request #30862 Better handling of no-output 
transforms.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 sdks/python/apache_beam/yaml/yaml_transform.py   | 15 +--
 sdks/python/apache_beam/yaml/yaml_transform_unit_test.py |  3 ++-
 2 files changed, 11 insertions(+), 7 deletions(-)



(beam) branch master updated (0d41168a096 -> 6c280c64e5c)

2024-04-04 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0d41168a096 byte-buddy:1.14.12 (#30746)
 add 6c280c64e5c Use empty flags for default expansion service options. 
(#30858)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/runners/portability/expansion_service.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)



(beam) branch master updated (0e861184775 -> eb5c73da86f)

2024-04-02 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0e861184775 Add SpannerIO Stress test (#30800)
 add eb5c73da86f Add WaitOn transform to Python, analogous to Java's 
Wait.on. (#30807)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/transforms/util.py  | 43 +++--
 sdks/python/apache_beam/transforms/util_test.py | 29 +
 2 files changed, 69 insertions(+), 3 deletions(-)



(beam) branch master updated: Don't double-nest display data payloads. (#30812)

2024-04-01 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4316b2fe203 Don't double-nest display data payloads. (#30812)
4316b2fe203 is described below

commit 4316b2fe203f142a6a89d94764a51159f9ccd408
Author: Robert Bradshaw 
AuthorDate: Mon Apr 1 16:49:58 2024 -0700

Don't double-nest display data payloads. (#30812)
---
 sdks/python/apache_beam/transforms/display.py | 22 +++---
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/display.py 
b/sdks/python/apache_beam/transforms/display.py
index 0d1dd552413..86bbf101f56 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -45,6 +45,7 @@ from datetime import datetime
 from datetime import timedelta
 from typing import TYPE_CHECKING
 from typing import List
+from typing import Optional
 from typing import Union
 
 from apache_beam.portability import common_urns
@@ -135,11 +136,7 @@ class DisplayData(object):
 # type: (...) -> List[beam_runner_api_pb2.DisplayData]
 
 """Returns a List of Beam proto representation of Display data."""
-def create_payload(dd):
-  if isinstance(dd, beam_runner_api_pb2.DisplayData):
-return dd
-
-  display_data_dict = None
+def create_payload(dd) -> Optional[beam_runner_api_pb2.LabelledPayload]:
   try:
 display_data_dict = dd.get_dict()
   except ValueError:
@@ -186,12 +183,15 @@ class DisplayData(object):
 
 dd_protos = []
 for dd in self.items:
-  dd_proto = create_payload(dd)
-  if dd_proto:
-dd_protos.append(
-beam_runner_api_pb2.DisplayData(
-urn=common_urns.StandardDisplayData.DisplayData.LABELLED.urn,
-payload=dd_proto.SerializeToString()))
+  if isinstance(dd, beam_runner_api_pb2.DisplayData):
+dd_protos.append(dd)
+  else:
+dd_payload = create_payload(dd)
+if dd_payload:
+  dd_protos.append(
+  beam_runner_api_pb2.DisplayData(
+  urn=common_urns.StandardDisplayData.DisplayData.LABELLED.urn,
+  payload=dd_payload.SerializeToString()))
 return dd_protos
 
   @classmethod



(beam) branch master updated: [YAML] Basic integration testing framework. (#29113)

2024-03-29 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f7319a6b6db [YAML] Basic integration testing framework. (#29113)
f7319a6b6db is described below

commit f7319a6b6db8e52da0f014d998b6638bd2da068b
Author: Robert Bradshaw 
AuthorDate: Fri Mar 29 09:50:22 2024 -0700

[YAML] Basic integration testing framework. (#29113)

The tests themselves are defined in yaml as a series of pipelines, possibly 
with
some setup code.

One of the key features of this framework is that if multiple providers 
vend the
same transform each will be tested to ensure they have consistent behavior.
---
 sdks/python/apache_beam/testing/util.py   |   4 +-
 sdks/python/apache_beam/yaml/integration_tests.py | 173 ++
 sdks/python/apache_beam/yaml/tests/bigquery.yaml  |  77 ++
 sdks/python/apache_beam/yaml/tests/csv.yaml   |  47 ++
 sdks/python/apache_beam/yaml/tests/json.yaml  |  47 ++
 sdks/python/apache_beam/yaml/yaml_provider.py |  12 ++
 6 files changed, 358 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index 10a7a8e86f9..cffafa6c074 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -301,12 +301,12 @@ def assert_that(
   if not use_global_window:
 plain_actual = plain_actual | 'AddWindow' >> ParDo(AddWindow())
 
-  plain_actual = plain_actual | 'Match' >> Map(matcher)
+  return plain_actual | 'Match' >> Map(matcher)
 
 def default_label(self):
   return label
 
-  actual | AssertThat()  # pylint: disable=expression-not-assigned
+  return actual | AssertThat()
 
 
 @ptransform_fn
diff --git a/sdks/python/apache_beam/yaml/integration_tests.py 
b/sdks/python/apache_beam/yaml/integration_tests.py
new file mode 100644
index 000..19c22d1c6d8
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/integration_tests.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Runs integration tests in the tests directory."""
+
+import contextlib
+import copy
+import glob
+import itertools
+import logging
+import os
+import unittest
+import uuid
+
+import mock
+import yaml
+
+import apache_beam as beam
+from apache_beam.io import filesystems
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.utils import python_callable
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml import yaml_transform
+
+
+@contextlib.contextmanager
+def gcs_temp_dir(bucket):
+  gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
+  yield gcs_tempdir
+  filesystems.FileSystems.delete([gcs_tempdir])
+
+
+@contextlib.contextmanager
+def temp_bigquery_table(project, prefix='yaml_bq_it_'):
+  bigquery_client = BigQueryWrapper()
+  dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex)
+  bigquery_client.get_or_create_dataset(project, dataset_id)
+  logging.info("Created dataset %s in project %s", dataset_id, project)
+  yield f'{project}:{dataset_id}.tmp_table'
+  request = bigquery.BigqueryDatasetsDeleteRequest(
+  projectId=project, datasetId=dataset_id, deleteContents=True)
+  logging.info("Deleting dataset %s in project %s", dataset_id, project)
+  bigquery_client.client.datasets.Delete(request)
+
+
+def replace_recursive(spec, vars):
+  if isinstance(spec, dict):
+return {
+key: replace_recursive(value, vars)
+for (key, value) in spec.items()
+}
+  elif isinstance(spec, list):
+return [replace_recursive(value, vars) for value in spec]
+  elif isinstance(spec, str) and '{' in spec:
+try:
+  return spec.format(**vars)
+except Exception as exn:
+  raise ValueError(f"Error evaluating {spec}: {exn}&q

(beam) branch master updated: [YAML] Add Partition transform. (#30368)

2024-03-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e3fee5156b3 [YAML] Add Partition transform. (#30368)
e3fee5156b3 is described below

commit e3fee5156b3515f96dc5ba90ea2fbc6f6be2bd34
Author: Robert Bradshaw 
AuthorDate: Thu Mar 28 17:25:00 2024 -0700

[YAML] Add Partition transform. (#30368)
---
 .../apache_beam/yaml/programming_guide_test.py |  19 ++
 sdks/python/apache_beam/yaml/readme_test.py|  30 +--
 sdks/python/apache_beam/yaml/yaml_mapping.py   |  88 +++-
 sdks/python/apache_beam/yaml/yaml_mapping_test.py  | 239 +
 sdks/python/apache_beam/yaml/yaml_transform.py |   6 +-
 .../content/en/documentation/programming-guide.md  |  16 ++
 .../site/content/en/documentation/sdks/yaml-udf.md |  68 ++
 7 files changed, 451 insertions(+), 15 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/programming_guide_test.py 
b/sdks/python/apache_beam/yaml/programming_guide_test.py
index cd7bf6a8814..fe5e242f7f5 100644
--- a/sdks/python/apache_beam/yaml/programming_guide_test.py
+++ b/sdks/python/apache_beam/yaml/programming_guide_test.py
@@ -404,6 +404,25 @@ class ProgrammingGuideTest(unittest.TestCase):
   # [END setting_timestamp]
   ''')
 
+  def test_partition(self):
+with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+pickle_library='cloudpickle')) as p:
+  elements = p | beam.Create([
+  beam.Row(percentile=1),
+  beam.Row(percentile=20),
+  beam.Row(percentile=90),
+  ])
+  _ = elements | YamlTransform(
+  '''
+  # [START model_multiple_pcollections_partition]
+  type: Partition
+  config:
+by: str(percentile // 10)
+language: python
+outputs: ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
+  # [END model_multiple_pcollections_partition]
+  ''')
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/yaml/readme_test.py 
b/sdks/python/apache_beam/yaml/readme_test.py
index 4ca60e6176b..ea7a015dab5 100644
--- a/sdks/python/apache_beam/yaml/readme_test.py
+++ b/sdks/python/apache_beam/yaml/readme_test.py
@@ -128,12 +128,25 @@ class FakeAggregation(beam.PTransform):
 lambda _: 1, sum, 'count')
 
 
+class _Fakes:
+  fn = str
+
+  class SomeTransform(beam.PTransform):
+def __init__(*args, **kwargs):
+  pass
+
+def expand(self, pcoll):
+  return pcoll
+
+
 RENDER_DIR = None
 TEST_TRANSFORMS = {
 'Sql': FakeSql,
 'ReadFromPubSub': FakeReadFromPubSub,
 'WriteToPubSub': FakeWriteToPubSub,
 'SomeGroupingTransform': FakeAggregation,
+'SomeTransform': _Fakes.SomeTransform,
+'AnotherTransform': _Fakes.SomeTransform,
 }
 
 
@@ -155,7 +168,7 @@ class TestEnvironment:
 return path
 
   def input_csv(self):
-return self.input_file('input.csv', 'col1,col2,col3\nabc,1,2.5\n')
+return self.input_file('input.csv', 'col1,col2,col3\na,1,2.5\n')
 
   def input_tsv(self):
 return self.input_file('input.tsv', 'col1\tcol2\tcol3\nabc\t1\t2.5\n')
@@ -250,13 +263,15 @@ def parse_test_methods(markdown_lines):
   else:
 if code_lines:
   if code_lines[0].startswith('- type:'):
+is_chain = not any('input:' in line for line in code_lines)
 # Treat this as a fragment of a larger pipeline.
 # pylint: disable=not-an-iterable
 code_lines = [
 'pipeline:',
-'  type: chain',
+'  type: chain' if is_chain else '',
 '  transforms:',
 '- type: ReadFromCsv',
+'  name: input',
 '  config:',
 'path: whatever',
 ] + ['' + line for line in code_lines]
@@ -278,17 +293,6 @@ def createTestSuite(name, path):
 return type(name, (unittest.TestCase, ), dict(parse_test_methods(readme)))
 
 
-class _Fakes:
-  fn = str
-
-  class SomeTransform(beam.PTransform):
-def __init__(*args, **kwargs):
-  pass
-
-def expand(self, pcoll):
-  return pcoll
-
-
 # These are copied from $ROOT/website/www/site/content/en/documentation/sdks
 # at build time.
 YAML_DOCS_DIR = os.path.join(os.path.join(os.path.dirname(__file__), 'docs'))
diff --git a/sdks/python/a

(beam) branch master updated (069c0459c2c -> 067b8fbb499)

2024-03-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 069c0459c2c Fix JaCoCo test report (#30687)
 add 2bddf76b398 Gradle target for building YAML reference manual.
 add df4e9b5424a Commit yaml docs as part of beam release.
 add f04896fef13 Add yaml docs building to release workflow.
 add 29cadc30078 Trailing whitespace.
 new 067b8fbb499 Merge pull request #30741 Gradle target and script for 
adding YAML reference manual to site.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/build_release_candidate.yml  | 39 ++
 .../src/main/scripts/build_release_candidate.sh| 17 ++
 sdks/python/build.gradle   | 24 -
 3 files changed, 65 insertions(+), 15 deletions(-)



(beam) 01/01: Merge pull request #30741 Gradle target and script for adding YAML reference manual to site.

2024-03-28 Thread robertwb
This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 067b8fbb499a7d3ee543c3313c7720981cc9
Merge: 069c0459c2c 29cadc30078
Author: Robert Bradshaw 
AuthorDate: Thu Mar 28 13:02:16 2024 -0700

Merge pull request #30741 Gradle target and script for adding YAML 
reference manual to site.

 .github/workflows/build_release_candidate.yml  | 39 ++
 .../src/main/scripts/build_release_candidate.sh| 17 ++
 sdks/python/build.gradle   | 24 -
 3 files changed, 65 insertions(+), 15 deletions(-)



  1   2   3   4   5   6   7   8   9   10   >