AnandInguva commented on a change in pull request #15900:
URL: https://github.com/apache/beam/pull/15900#discussion_r747689256



##########
File path: sdks/python/apache_beam/examples/fastavro_it_test.py
##########
@@ -123,65 +137,44 @@ def batch_indices(start):
         | 'expand-batches' >> FlatMap(batch_indices) \
         | 'create-records' >> Map(record)
 
-    fastavro_output = '/'.join([self.output, 'fastavro'])
-    avro_output = '/'.join([self.output, 'avro'])
-
     # pylint: disable=expression-not-assigned
     records_pcoll \
     | 'write_fastavro' >> WriteToAvro(
         fastavro_output,
         parse_schema(json.loads(self.SCHEMA_STRING)),
-        use_fastavro=True
     )
+    result = self.test_pipeline.run()
+    result.wait_until_finish()
+    fastavro_pcoll = self.test_pipeline \
+                     | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
+                     | 'read-fastavro' >> ReadAllFromAvro()
 
-    # pylint: disable=expression-not-assigned
-    records_pcoll \
-    | 'write_avro' >> WriteToAvro(
-        avro_output,
-        Parse(self.SCHEMA_STRING),
-        use_fastavro=False
-    )
+    mapped_fastavro_pcoll = fastavro_pcoll | "map_fastavro" >> Map(
+        lambda x: (x['number'], x))
+    mapped_record_pcoll = records_pcoll | "map_record" >> Map(
+        lambda x: (x['number'], x))
+
+    def validate_record(elem):
+      v = elem[1]
+
+      def assertEqual(l, r):
+        if l != r:
+          raise BeamAssertException('Assertion failed: %s == %s' % (l, r))
+
+      assertEqual(sorted(v.keys()), ['fastavro', 'record_pcoll'])
+      record_pcoll_values = v['record_pcoll']
+      fastavro_values = v['fastavro']
+      assertEqual(record_pcoll_values, fastavro_values)
+      assertEqual(len(record_pcoll_values), 1)

Review comment:
       We will have unique keys for each record as the keys are integers in 
ascending order.
   
   <(0, {'record_pcoll': [{'label': 'abc', 'number': 0, 'number_str': '0', 
'color': 'RED'}], 'fastavro': [{'label': 'abc', 'number': 0, 'number_str': '0', 
'color': 'RED'}]})
   (1, {'record_pcoll': [{'label': 'def', 'number': 1, 'number_str': '1', 
'color': 'ORANGE'}], 'fastavro': [{'label': 'def', 'number': 1, 'number_str': 
'1', 'color': 'ORANGE'}]})
   (2, {'record_pcoll': [{'label': 'ghi', 'number': 2, 'number_str': '2', 
'color': 'YELLOW'}], 'fastavro': [{'label': 'ghi', 'number': 2, 'number_str': 
'2', 'color': 'YELLOW'}]})
   (3, {'record_pcoll': [{'label': 'jkl', 'number': 3, 'number_str': '3', 
'color': 'GREEN'}], 'fastavro': [{'label': 'jkl', 'number': 3, 'number_str': 
'3', 'color': 'GREEN'}]})
   (4, {'record_pcoll': [{'label': 'mno', 'number': 4, 'number_str': '4', 
'color': 'BLUE'}], 'fastavro': [{'label': 'mno', 'number': 4, 'number_str': 
'4', 'color': 'BLUE'}]})
   (5, {'record_pcoll': [{'label': 'pqr', 'number': 5, 'number_str': '5', 
'color': 'PURPLE'}], 'fastavro': [{'label': 'pqr', 'number': 5, 'number_str': 
'5', 'color': 'PURPLE'}]})
   (6, {'record_pcoll': [{'label': 'stu', 'number': 6, 'number_str': '6', 
'color': None}], 'fastavro': [{'label': 'stu', 'number': 6, 'number_str': '6', 
'color': None}]})
   (7, {'record_pcoll': [{'label': 'vwx', 'number': 7, 'number_str': '7', 
'color': 'RED'}], 'fastavro': [{'label': 'vwx', 'number': 7, 'number_str': '7', 
'color': 'RED'}]})
   (8, {'record_pcoll': [{'label': 'abc', 'number': 8, 'number_str': '8', 
'color': 'ORANGE'}], 'fastavro': [{'label': 'abc', 'number': 8, 'number_str': 
'8', 'color': 'ORANGE'}]})
   (9, {'record_pcoll': [{'label': 'def', 'number': 9, 'number_str': '9', 
'color': 'YELLOW'}], 'fastavro': [{'label': 'def', 'number': 9, 'number_str': 
'9', 'color': 'YELLOW'}]})
   (10, {'record_pcoll': [{'label': 'ghi', 'number': 10, 'number_str': '10', 
'color': 'GREEN'}], 'fastavro': [{'label': 'ghi', 'number': 10, 'number_str': 
'10', 'color': 'GREEN'}]})
   (11, {'record_pcoll': [{'label': 'jkl', 'number': 11, 'number_str': '11', 
'color': 'BLUE'}], 'fastavro': [{'label': 'jkl', 'number': 11, 'number_str': 
'11', 'color': 'BLUE'}]})
   (12, {'record_pcoll': [{'label': 'mno', 'number': 12, 'number_str': '12', 
'color': 'PURPLE'}], 'fastavro': [{'label': 'mno', 'number': 12, 'number_str': 
'12', 'color': 'PURPLE'}]})
   (13, {'record_pcoll': [{'label': 'pqr', 'number': 13, 'number_str': '13', 
'color': None}], 'fastavro': [{'label': 'pqr', 'number': 13, 'number_str': 
'13', 'color': None}]})
   (14, {'record_pcoll': [{'label': 'stu', 'number': 14, 'number_str': '14', 
'color': 'RED'}], 'fastavro': [{'label': 'stu', 'number': 14, 'number_str': 
'14', 'color': 'RED'}]})
   (15, {'record_pcoll': [{'label': 'vwx', 'number': 15, 'number_str': '15', 
'color': 'ORANGE'}], 'fastavro': [{'label': 'vwx', 'number': 15, 'number_str': 
'15', 'color': 'ORANGE'}]})
   (16, {'record_pcoll': [{'label': 'abc', 'number': 16, 'number_str': '16', 
'color': 'YELLOW'}], 'fastavro': [{'label': 'abc', 'number': 16, 'number_str': 
'16', 'color': 'YELLOW'}]})
   (17, {'record_pcoll': [{'label': 'def', 'number': 17, 'number_str': '17', 
'color': 'GREEN'}], 'fastavro': [{'label': 'def', 'number': 17, 'number_str': 
'17', 'color': 'GREEN'}]})
   (18, {'record_pcoll': [{'label': 'ghi', 'number': 18, 'number_str': '18', 
'color': 'BLUE'}], 'fastavro': [{'label': 'ghi', 'number': 18, 'number_str': 
'18', 'color': 'BLUE'}]})
   (19, {'record_pcoll': [{'label': 'jkl', 'number': 19, 'number_str': '19', 
'color': 'PURPLE'}], 'fastavro': [{'label': 'jkl', 'number': 19, 'number_str': 
'19', 'color': 'PURPLE'}]})>
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to