[ https://issues.apache.org/jira/browse/BEAM-2810?focusedWorklogId=116628&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116628 ]
ASF GitHub Bot logged work on BEAM-2810: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Jun/18 21:54 Start Date: 27/Jun/18 21:54 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #5496: [BEAM-2810] use fastavro in Avro IO URL: https://github.com/apache/beam/pull/5496#discussion_r198653490 ########## File path: sdks/python/apache_beam/io/avroio.py ########## @@ -377,6 +407,56 @@ def split_points_unclaimed(stop_position): yield record +class _FastAvroSource(filebasedsource.FileBasedSource): + """A source for reading Avro files using the `fastavro` library. + + ``_FastAvroSource`` is implemented using the file-based source framework + available in module 'filebasedsource'. Hence please refer to module + 'filebasedsource' to fully understand how this source implements operations + common to all file-based sources such as file-pattern expansion and splitting + into bundles for parallel processing. + + TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource`` + everywhere once it has been more widely tested + """ + + def read_records(self, file_name, range_tracker): + next_block_start = -1 + + def split_points_unclaimed(stop_position): + if next_block_start >= stop_position: + # Next block starts at or after the suggested stop position. Hence + # there will not be split points to be claimed for the range ending at + # suggested stop position. + return 0 + + return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN + + range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) + + start_offset = range_tracker.start_position() + if start_offset is None: + start_offset = 0 + + with self.open_file(file_name) as f: + blocks = block_reader(f) + sync_marker = blocks._header['sync'] + + # We have to start at current position if previous bundle ended at the + # end of a sync marker. + start_offset = max(0, start_offset - len(sync_marker)) + f.seek(start_offset) + _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker) + + next_block_start = f.tell() + + while range_tracker.try_claim(f.tell()): Review comment: Sorry I missed this. Could you explain how this works ? Looks like blocks iterator has already been formed by this time. But we want to make sure that the block starting at next_block_start at first iteration is the block read in first iteration. After this, all range_tracker.try_claim() checks should be for the starting byte of subsequent blocks. Does the reader guarantee these properties ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 116628) Time Spent: 3h 50m (was: 3h 40m) > Consider a faster Avro library in Python > ---------------------------------------- > > Key: BEAM-2810 > URL: https://issues.apache.org/jira/browse/BEAM-2810 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Eugene Kirpichov > Assignee: Ryan Williams > Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > https://stackoverflow.com/questions/45870789/bottleneck-on-data-source > Seems like this job is reading Avro files (exported by BigQuery) at about 2 > MB/s. > We use the standard Python "avro" library which is apparently known to be > very slow (10x+ slower than Java) > http://apache-avro.679487.n3.nabble.com/Avro-decode-very-slow-in-Python-td4034422.html, > and there are alternatives e.g. https://pypi.python.org/pypi/fastavro/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)