This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e16d401987d [YAML] Clean up some confusing error messages. (#29481)
e16d401987d is described below

commit e16d401987d9656bf791afc0d4679b09b8f3fe04
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Tue Dec 5 13:15:33 2023 -0800

    [YAML] Clean up some confusing error messages. (#29481)
    
    * Provide sane defaults rather than NullPtrExceptions for optional BigQuery 
parameters.
    * Better error when cross-language is used with (incompatible) local 
streaming Python runner.
    * Add format and schema to readme PubSub examples.
---
 ...ueryStorageWriteApiSchemaTransformProvider.java | 11 +++++----
 .../apache_beam/runners/direct/direct_runner.py    | 11 +++++++++
 sdks/python/apache_beam/yaml/README.md             | 28 +++++++++++++++++++++-
 sdks/python/apache_beam/yaml/readme_test.py        |  4 ++--
 4 files changed, 47 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 39e6fd7c809..98cc246ce0d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -338,10 +338,13 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
       if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
         Long triggeringFrequency = 
configuration.getTriggeringFrequencySeconds();
         Boolean autoSharding = configuration.getAutoSharding();
-        Integer numStreams = configuration.getNumStreams();
+        int numStreams = configuration.getNumStreams() == null ? 0 : 
configuration.getNumStreams();
+        boolean useAtLeastOnceSemantics =
+            configuration.getUseAtLeastOnceSemantics() == null
+                ? false
+                : configuration.getUseAtLeastOnceSemantics();
         // Triggering frequency is only applicable for exactly-once
-        if (configuration.getUseAtLeastOnceSemantics() == null
-            || !configuration.getUseAtLeastOnceSemantics()) {
+        if (!useAtLeastOnceSemantics) {
           write =
               write.withTriggeringFrequency(
                   (triggeringFrequency == null || triggeringFrequency <= 0)
@@ -349,7 +352,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
                       : Duration.standardSeconds(triggeringFrequency));
         }
         // set num streams if specified, otherwise default to autoSharding
-        if (numStreams != null && numStreams > 0) {
+        if (numStreams > 0) {
           write = write.withNumStorageWriteApiStreams(numStreams);
         } else if (autoSharding == null || autoSharding) {
           write = write.withAutoSharding();
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index db53e4122bb..a470ba80d8e 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -507,6 +507,17 @@ class BundleBasedDirectRunner(PipelineRunner):
     from apache_beam.runners.direct.transform_evaluator import \
       TransformEvaluatorRegistry
     from apache_beam.testing.test_stream import TestStream
+    from apache_beam.transforms.external import ExternalTransform
+
+    class VerifyNoCrossLanguageTransforms(PipelineVisitor):
+      """Visitor determining whether a Pipeline uses a TestStream."""
+      def visit_transform(self, applied_ptransform):
+        if isinstance(applied_ptransform.transform, ExternalTransform):
+          raise RuntimeError(
+              "Streaming Python direct runner "
+              "does not support cross-language pipelines.")
+
+    pipeline.visit(VerifyNoCrossLanguageTransforms())
 
     # If the TestStream I/O is used, use a mock test clock.
     class TestStreamUsageVisitor(PipelineVisitor):
diff --git a/sdks/python/apache_beam/yaml/README.md 
b/sdks/python/apache_beam/yaml/README.md
index e1528a8e480..0b5c118ba8a 100644
--- a/sdks/python/apache_beam/yaml/README.md
+++ b/sdks/python/apache_beam/yaml/README.md
@@ -274,6 +274,13 @@ pipeline:
     - type: ReadFromPubSub
       config:
         topic: myPubSubTopic
+        format: json
+        schema:
+          type: object
+          properties:
+            col1: {type: string}
+            col2: {type: integer}
+            col3: {type: number}
     - type: WindowInto
       windowing:
         type: fixed
@@ -282,6 +289,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
+        format: json
 ```
 
 Rather than using an explicit `WindowInto` operation, one may instead tag a
@@ -295,6 +303,8 @@ pipeline:
     - type: ReadFromPubSub
       config:
         topic: myPubSubTopic
+        format: ...
+        schema: ...
     - type: SomeAggregation
       windowing:
         type: sliding
@@ -303,6 +313,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
+        format: json
 ```
 
 Note that the `Sql` operation itself is often a from of aggregation, and
@@ -316,6 +327,8 @@ pipeline:
     - type: ReadFromPubSub
       config:
         topic: myPubSubTopic
+        format: ...
+        schema: ...
     - type: Sql
       config:
         query: "select col1, count(*) as c from PCOLLECTION"
@@ -325,6 +338,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
+        format: json
 ```
 
 The specified windowing is applied to all inputs, in this case resulting in
@@ -337,11 +351,15 @@ pipeline:
       name: ReadLeft
       config:
         topic: leftTopic
+        format: ...
+        schema: ...
 
     - type: ReadFromPubSub
       name: ReadRight
       config:
         topic: rightTopic
+        format: ...
+        schema: ...
 
     - type: Sql
       config:
@@ -364,7 +382,9 @@ pipeline:
   transforms:
     - type: ReadFromPubSub
       config:
-       topic: myPubSubTopic
+        topic: myPubSubTopic
+        format: ...
+        schema: ...
       windowing:
         type: fixed
         size: 60
@@ -374,6 +394,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
+        format: json
 ```
 
 One can also specify windowing at the top level of a pipeline (or composite),
@@ -388,12 +409,15 @@ pipeline:
     - type: ReadFromPubSub
       config:
         topic: myPubSubTopic
+        format: ...
+        schema: ...
     - type: Sql
       config:
         query: "select col1, count(*) as c from PCOLLECTION"
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
+        format: json
   windowing:
     type: fixed
     size: 60
@@ -410,6 +434,8 @@ pipeline:
     type: ReadFromPubSub
     config:
       topic: myPubSubTopic
+      format: ...
+      schema: ...
     windowing:
       type: fixed
       size: 10
diff --git a/sdks/python/apache_beam/yaml/readme_test.py 
b/sdks/python/apache_beam/yaml/readme_test.py
index 7f2d193bf35..8ec9b22cc92 100644
--- a/sdks/python/apache_beam/yaml/readme_test.py
+++ b/sdks/python/apache_beam/yaml/readme_test.py
@@ -99,7 +99,7 @@ class FakeSql(beam.PTransform):
 
 
 class FakeReadFromPubSub(beam.PTransform):
-  def __init__(self, topic):
+  def __init__(self, topic, format, schema):
     pass
 
   def expand(self, p):
@@ -112,7 +112,7 @@ class FakeReadFromPubSub(beam.PTransform):
 
 
 class FakeWriteToPubSub(beam.PTransform):
-  def __init__(self, topic):
+  def __init__(self, topic, format):
     pass
 
   def expand(self, pcoll):

Reply via email to