AnandInguva commented on a change in pull request #15900:
URL: https://github.com/apache/beam/pull/15900#discussion_r747689256
##########
File path: sdks/python/apache_beam/examples/fastavro_it_test.py
##########
@@ -123,65 +137,44 @@ def batch_indices(start):
| 'expand-batches' >> FlatMap(batch_indices) \
| 'create-records' >> Map(record)
- fastavro_output = '/'.join([self.output, 'fastavro'])
- avro_output = '/'.join([self.output, 'avro'])
-
# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_fastavro' >> WriteToAvro(
fastavro_output,
parse_schema(json.loads(self.SCHEMA_STRING)),
- use_fastavro=True
)
+ result = self.test_pipeline.run()
+ result.wait_until_finish()
+ fastavro_pcoll = self.test_pipeline \
+ | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
+ | 'read-fastavro' >> ReadAllFromAvro()
- # pylint: disable=expression-not-assigned
- records_pcoll \
- | 'write_avro' >> WriteToAvro(
- avro_output,
- Parse(self.SCHEMA_STRING),
- use_fastavro=False
- )
+ mapped_fastavro_pcoll = fastavro_pcoll | "map_fastavro" >> Map(
+ lambda x: (x['number'], x))
+ mapped_record_pcoll = records_pcoll | "map_record" >> Map(
+ lambda x: (x['number'], x))
+
+ def validate_record(elem):
+ v = elem[1]
+
+ def assertEqual(l, r):
+ if l != r:
+ raise BeamAssertException('Assertion failed: %s == %s' % (l, r))
+
+ assertEqual(sorted(v.keys()), ['fastavro', 'record_pcoll'])
+ record_pcoll_values = v['record_pcoll']
+ fastavro_values = v['fastavro']
+ assertEqual(record_pcoll_values, fastavro_values)
+ assertEqual(len(record_pcoll_values), 1)
Review comment:
We will have unique keys for each record as the keys are integers in
ascending order.
`(0, {'record_pcoll': [{'label': 'abc', 'number': 0, 'number_str': '0',
'color': 'RED'}], 'fastavro': [{'label': 'abc', 'number': 0, 'number_str': '0',
'color': 'RED'}]})
(1, {'record_pcoll': [{'label': 'def', 'number': 1, 'number_str': '1',
'color': 'ORANGE'}], 'fastavro': [{'label': 'def', 'number': 1, 'number_str':
'1', 'color': 'ORANGE'}]})
(2, {'record_pcoll': [{'label': 'ghi', 'number': 2, 'number_str': '2',
'color': 'YELLOW'}], 'fastavro': [{'label': 'ghi', 'number': 2, 'number_str':
'2', 'color': 'YELLOW'}]})
(3, {'record_pcoll': [{'label': 'jkl', 'number': 3, 'number_str': '3',
'color': 'GREEN'}], 'fastavro': [{'label': 'jkl', 'number': 3, 'number_str':
'3', 'color': 'GREEN'}]})
(4, {'record_pcoll': [{'label': 'mno', 'number': 4, 'number_str': '4',
'color': 'BLUE'}], 'fastavro': [{'label': 'mno', 'number': 4, 'number_str':
'4', 'color': 'BLUE'}]})
(5, {'record_pcoll': [{'label': 'pqr', 'number': 5, 'number_str': '5',
'color': 'PURPLE'}], 'fastavro': [{'label': 'pqr', 'number': 5, 'number_str':
'5', 'color': 'PURPLE'}]})
(6, {'record_pcoll': [{'label': 'stu', 'number': 6, 'number_str': '6',
'color': None}], 'fastavro': [{'label': 'stu', 'number': 6, 'number_str': '6',
'color': None}]})
(7, {'record_pcoll': [{'label': 'vwx', 'number': 7, 'number_str': '7',
'color': 'RED'}], 'fastavro': [{'label': 'vwx', 'number': 7, 'number_str': '7',
'color': 'RED'}]})
(8, {'record_pcoll': [{'label': 'abc', 'number': 8, 'number_str': '8',
'color': 'ORANGE'}], 'fastavro': [{'label': 'abc', 'number': 8, 'number_str':
'8', 'color': 'ORANGE'}]})
(9, {'record_pcoll': [{'label': 'def', 'number': 9, 'number_str': '9',
'color': 'YELLOW'}], 'fastavro': [{'label': 'def', 'number': 9, 'number_str':
'9', 'color': 'YELLOW'}]})
(10, {'record_pcoll': [{'label': 'ghi', 'number': 10, 'number_str': '10',
'color': 'GREEN'}], 'fastavro': [{'label': 'ghi', 'number': 10, 'number_str':
'10', 'color': 'GREEN'}]})
(11, {'record_pcoll': [{'label': 'jkl', 'number': 11, 'number_str': '11',
'color': 'BLUE'}], 'fastavro': [{'label': 'jkl', 'number': 11, 'number_str':
'11', 'color': 'BLUE'}]})
(12, {'record_pcoll': [{'label': 'mno', 'number': 12, 'number_str': '12',
'color': 'PURPLE'}], 'fastavro': [{'label': 'mno', 'number': 12, 'number_str':
'12', 'color': 'PURPLE'}]})
(13, {'record_pcoll': [{'label': 'pqr', 'number': 13, 'number_str': '13',
'color': None}], 'fastavro': [{'label': 'pqr', 'number': 13, 'number_str':
'13', 'color': None}]})
(14, {'record_pcoll': [{'label': 'stu', 'number': 14, 'number_str': '14',
'color': 'RED'}], 'fastavro': [{'label': 'stu', 'number': 14, 'number_str':
'14', 'color': 'RED'}]})
(15, {'record_pcoll': [{'label': 'vwx', 'number': 15, 'number_str': '15',
'color': 'ORANGE'}], 'fastavro': [{'label': 'vwx', 'number': 15, 'number_str':
'15', 'color': 'ORANGE'}]})
(16, {'record_pcoll': [{'label': 'abc', 'number': 16, 'number_str': '16',
'color': 'YELLOW'}], 'fastavro': [{'label': 'abc', 'number': 16, 'number_str':
'16', 'color': 'YELLOW'}]})
(17, {'record_pcoll': [{'label': 'def', 'number': 17, 'number_str': '17',
'color': 'GREEN'}], 'fastavro': [{'label': 'def', 'number': 17, 'number_str':
'17', 'color': 'GREEN'}]})
(18, {'record_pcoll': [{'label': 'ghi', 'number': 18, 'number_str': '18',
'color': 'BLUE'}], 'fastavro': [{'label': 'ghi', 'number': 18, 'number_str':
'18', 'color': 'BLUE'}]})
(19, {'record_pcoll': [{'label': 'jkl', 'number': 19, 'number_str': '19',
'color': 'PURPLE'}], 'fastavro': [{'label': 'jkl', 'number': 19, 'number_str':
'19', 'color': 'PURPLE'}]})`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]