[ 
https://issues.apache.org/jira/browse/BEAM-2810?focusedWorklogId=116628&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116628
 ]

ASF GitHub Bot logged work on BEAM-2810:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jun/18 21:54
            Start Date: 27/Jun/18 21:54
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#5496: [BEAM-2810] use fastavro in Avro IO
URL: https://github.com/apache/beam/pull/5496#discussion_r198653490
 
 

 ##########
 File path: sdks/python/apache_beam/io/avroio.py
 ##########
 @@ -377,6 +407,56 @@ def split_points_unclaimed(stop_position):
           yield record
 
 
+class _FastAvroSource(filebasedsource.FileBasedSource):
+  """A source for reading Avro files using the `fastavro` library.
+
+  ``_FastAvroSource`` is implemented using the file-based source framework
+  available in module 'filebasedsource'. Hence please refer to module
+  'filebasedsource' to fully understand how this source implements operations
+  common to all file-based sources such as file-pattern expansion and splitting
+  into bundles for parallel processing.
+
+  TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource``
+  everywhere once it has been more widely tested
+  """
+
+  def read_records(self, file_name, range_tracker):
+    next_block_start = -1
+
+    def split_points_unclaimed(stop_position):
+      if next_block_start >= stop_position:
+        # Next block starts at or after the suggested stop position. Hence
+        # there will not be split points to be claimed for the range ending at
+        # suggested stop position.
+        return 0
+
+      return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN
+
+    range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
+    start_offset = range_tracker.start_position()
+    if start_offset is None:
+      start_offset = 0
+
+    with self.open_file(file_name) as f:
+      blocks = block_reader(f)
+      sync_marker = blocks._header['sync']
+
+      # We have to start at current position if previous bundle ended at the
+      # end of a sync marker.
+      start_offset = max(0, start_offset - len(sync_marker))
+      f.seek(start_offset)
+      _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker)
+
+      next_block_start = f.tell()
+
+      while range_tracker.try_claim(f.tell()):
 
 Review comment:
   Sorry I missed this. 
   
   Could you explain how this works ? Looks like blocks iterator has already 
been formed by this time. But we want to make sure that the block starting at 
next_block_start at first iteration is the block read in first iteration. After 
this, all range_tracker.try_claim() checks should be for the starting byte of 
subsequent blocks. Does the reader guarantee these properties ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 116628)
    Time Spent: 3h 50m  (was: 3h 40m)

> Consider a faster Avro library in Python
> ----------------------------------------
>
>                 Key: BEAM-2810
>                 URL: https://issues.apache.org/jira/browse/BEAM-2810
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> https://stackoverflow.com/questions/45870789/bottleneck-on-data-source
> Seems like this job is reading Avro files (exported by BigQuery) at about 2 
> MB/s.
> We use the standard Python "avro" library which is apparently known to be 
> very slow (10x+ slower than Java) 
> http://apache-avro.679487.n3.nabble.com/Avro-decode-very-slow-in-Python-td4034422.html,
>  and there are alternatives e.g. https://pypi.python.org/pypi/fastavro/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to