boyuanzz commented on a change in pull request #13405:
URL: https://github.com/apache/beam/pull/13405#discussion_r537892923



##########
File path: sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
##########
@@ -329,3 +330,46 @@ def expand(self, pcoll):
         return {key: out for key in self.outputs}
 
     return WriteToBigQuery(ptransform, self.outputs)
+
+
+class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
+  """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.
+
+  This override simply returns the original transform but additionally records
+  the output PCollection in order to append required step properties during
+  graph translation.
+  """
+  def __init__(self, dataflow_runner, options):
+    self.dataflow_runner = dataflow_runner
+    self.options = options
+
+  def matches(self, applied_ptransform):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam import util
+
+    transform = applied_ptransform.transform
+
+    if not isinstance(transform, util.GroupIntoBatches.WithShardedKey):
+      return False
+
+    # The replacement is only valid for portable Streaming Engine jobs.

Review comment:
       ```suggestion
       # The replacement is only valid for portable Streaming Engine jobs with 
runner_v2.
   ```

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -825,7 +827,60 @@ def expand(self, pcoll):
               key_value[1]))
       return (
           sharded_pcoll
-          | GroupIntoBatches(self.batch_size, 
self.max_buffering_duration_secs))
+          | GroupIntoBatches(
+              self.params.batch_size, self.params.max_buffering_duration_secs))
+
+    def to_runner_api_parameter(
+        self,
+        unused_context  # type: PipelineContext
+    ):  # type: (...) -> Tuple[str, 
beam_runner_api_pb2.GroupIntoBatchesPayload]
+      return (
+          common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn,
+          self.params.get_payload())

Review comment:
       We can get ride of `GroupIntoBatchesParams` by creating 
`GroupIntoBatchesPayload ` directly here and accessing the payload directly 
below.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/names.py
##########
@@ -69,6 +69,7 @@ class PropertyNames(object):
 
   Property strings as they are expected in the CloudWorkflow protos.
   """
+  ALLOWS_SHARDABLE_STATE = 'allows_shardable_state'

Review comment:
       Could you please add some explanations on what `ALLOWS_SHARDABLE_STATE ` 
and `PRESERVES_KEYS` mean? 

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -706,6 +714,16 @@ message PubSubWritePayload {
   string id_attribute = 3;
 }
 
+// Payload for GroupIntoBatches composite transform.
+message GroupIntoBatchesPayload {
+
+  // (Required) Max size of a batch.
+  int64 batch_size = 1;
+
+  // (Optional) Max duration a batch is allowed to be cached in states.
+  int64 max_buffering_duration_millis = 2;

Review comment:
       It seems like `max_buffering_duration_millis is None` and 
`max_buffering_duration_millis  == 0` means the same. right? You may want to 
add a comment about  what will happen if `max_buffering_duration_millis` is not 
specified or `max_buffering_duration_millis` is set to 0. Someone might think 
`max_buffering_duration_millis = 0` or `max_buffering_duration_millis is None` 
will make the transform output elements as soon as possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to