kamilwu commented on a change in pull request #11856:
URL: https://github.com/apache/beam/pull/11856#discussion_r498153192



##########
File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
##########
@@ -122,85 +122,95 @@ def process(self, unused_element, side_input):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):
-      """Take a sequence of keys as an additional side input and for each
-      key in the sequence checks the value for key in the dictionary."""
-      def process(self, unused_element, dict_side_input, keys_to_check):
-        for key in keys_to_check:
-          # No-op. We only make sure that the element is accessed.
-          dict_side_input[key]
-
-    class GetRandomKeys(beam.DoFn):
-      def __init__(self, n):
-        self._n = n
+      """Iterates over first n keys in the dictionary and checks the value."""
+      def __init__(self, first_n):
+        self._first_n = first_n
 
       def process(self, unused_element, dict_side_input):
-        import random
-        n = min(self._n, len(dict_side_input))
-        return random.sample(dict_side_input.keys(), n)
+        i = 0
+        for key in dict_side_input:
+          if i == self._first_n:
+            break
+          # No-op. We only make sure that the element is accessed.
+          dict_side_input[key]
+          i += 1
 
-    class AddEventTimestamps(beam.DoFn):
-      """Assign timestamp to each element of PCollection."""
-      def setup(self):
-        self._timestamp = 0
+    @typehints.with_input_types(int)
+    @typehints.with_output_types(int)
+    class AssignTimestamps(beam.DoFn):
+      """Produces timestamped values. Timestamps are equal to the value of the
+      element."""
+      def __init__(self):
+        # Avoid having to use save_main_session
+        self.window = window
 
       def process(self, element):
-        from apache_beam.transforms.combiners import window
-        yield window.TimestampedValue(element, self._timestamp)
-        self._timestamp += 1
-
-    input_pc = (
-        self.pipeline
-        | 'Read synthetic' >> beam.io.Read(
-            SyntheticSource(self.parse_synthetic_source_options()))
-        | 'Collect start time metrics' >> beam.ParDo(
-            MeasureTime(self.metrics_namespace)))
-
-    if self.side_input_size != self.input_options.get('num_records'):
-      side_input = (
-          input_pc
-          | 'Sample {} elements'.format(self.side_input_size) >>
-          beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
-          | 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
+        yield self.window.TimestampedValue(element, element)
+
+    @typehints.with_input_types(Any)
+    @typehints.with_output_types(Dict[str, Union[int, str]])
+    class GetSyntheticSDFOptions(beam.DoFn):
+      def __init__(self, elements_per_record, key_size, value_size):
+        self.elements_per_record = elements_per_record
+        self.key_size = key_size
+        self.value_size = value_size
+
+      def process(self, unused_element):
+        yield {
+            'num_records': self.elements_per_record,
+            'key_size': self.key_size,
+            'value_size': self.value_size,
+            'initial_splitting_num_bundles': 0,
+            'initial_splitting_desired_bundle_size': 0,
+            'sleep_per_input_record_sec': 0,
+            'initial_splitting': 'const'
+        }
+
+    main_input = self.pipeline | 'Create' >> beam.Create(range(self.windows))
+
+    initial_elements = self.SDF_INITIAL_ELEMENTS
+    if self.windows > 1:
+      main_input = (
+          main_input
+          | 'Assign timestamps' >> beam.ParDo(AssignTimestamps())
+          | 'Apply windows' >> beam.WindowInto(window.FixedWindows(1)))
+      side_input = main_input
+      initial_elements = self.windows
     else:
-      side_input = input_pc
-
-    if self.windows > 0:
-      window_size = self.side_input_size / self.windows
-      logging.info('Fixed windows of %s seconds will be applied', window_size)
-      side_input = (
-          side_input
-          | 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
-          | 'Apply windows' >> beam.WindowInto(
-              beam.combiners.window.FixedWindows(window_size)))
+      side_input = self.pipeline | 'Side input: create' >> beam.Create(
+          range(initial_elements))

Review comment:
       Generally speaking, `initial_elements` should be equal to the number of 
windows. But there is an edge case. By default, a global window is being used 
(so the number of windows is 1): 
   ```python
   self.get_option_or_default('window_count', default=1)
   ```
   I noticed the pipeline runs slower when the number of elements before 
`SyntheticSDFAsSource` step is 1. In other words, it's more efficient to 
generate a big load of data from, say, 1000 initial elements than from 1 
element. More than a half of our test cases use only 1 window, so this is an 
important improvement.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to