[
https://issues.apache.org/jira/browse/BAHIR-213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896994#comment-16896994
]
Abhishek Dixit edited comment on BAHIR-213 at 7/31/19 9:51 AM:
---------------------------------------------------------------
* *Issue of FNFE when querying file:* I checked the behavior of the existing
implementation of FileStreamSource for this scenario, i.e. if the file gets
listed in the _getOffset_ phase but gets deleted before file read happens in
_getBatch,_ Spark catches the FNFE and the following warning is thrown
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any
concerns about mimicking FileStreamSource Behavior for this scenario.
* *Issue of File State Update:* In case of fo FileStreamSource, files are
listed in _getOffset_ phase and if the contents of the same file change before
the file is actually read in the _getBatch_ phase, then the initial content of
the file is lost & not processed. Only the latest content of file available
during _getBatch_ is processed & similar will be the behavior of SQS Source.
* *Issue of Double Update:* On the other hand, if the file is processed in
_getBatch_ and then later updated, FileStreamSource considers the file as
seen/processed and doesn't read it again unless the file is aged. SQS Source
also behaves, in the same way, i.e. if an SQS message pertaining to an already
processed file comes again, it is simply ignored.
* *Issue of Messages arriving out of order:* By default, the new/unprocessed
file list obtained from SQS File Cache is sorted based on timestamp (of file
updation) to avoid the issue of messages arriving out of order. In case some
messages arrive very late, they will be processed in succeeding micro-batches.
In any case, we can't guarantee to process files in the order of timestamp
because of the distributed nature of SQS.
[[email protected]] Let me know if you have any concerns with any of the above
points and want me to change the implementation in some way.
was (Author: abhishekd0907):
* *Issue of FNFE when querying [file:*|file:///*] I checked the behavior of
the existing implementation of FileStreamSource for this scenario, i.e. if the
file gets listed in the _getOffset_ phase but gets deleted before file read
happens in _getBatch,_ Spark catches the FNFE and the following warning is
thrown
{code:java}
The directory $path was not found. Was it deleted very recently?{code}
I have gone with similar behavior in SqsSource. Let me know if you have any
concerns about mimicking FileStreamSource Behavior for this scenario.
* *Issue of File State Update:* In case of fo FileStreamSource, files are
listed in _getOffset_ phase and if the contents of the same file change before
the file is actually read in the _getBatch_ phase, then the initial content of
the file is lost & not processed. Only the latest content of file available
during _getBatch_ is processed & similar will be the behavior of SQS Source.
* *Issue of Double Update:* On the other hand, if the file is processed in
_getBatch_ and then later updated, FileStreamSource considers the file as
seen/processed and doesn't read it again unless the file is aged. SQS Source
also behaves, in the same way, i.e. if an SQS message pertaining to an already
processed file comes again, it is simply ignored.
* *Issue of Messages arriving out of order:* By default, the new/unprocessed
file list obtained from SQS File Cache is sorted based on timestamp (of file
updation) to avoid the issue of messages arriving out of order. In case some
messages arrive very late, they will be processed in succeeding micro-batches.
In any case, we can't guarantee to process files in the order of timestamp
because of the distributed nature of SQS.
[[email protected]] Let me know if you have any concerns with any of the above
points and want me to change the implementation in some way.
> Faster S3 file Source for Structured Streaming with SQS
> -------------------------------------------------------
>
> Key: BAHIR-213
> URL: https://issues.apache.org/jira/browse/BAHIR-213
> Project: Bahir
> Issue Type: New Feature
> Components: Spark Structured Streaming Connectors
> Affects Versions: Spark-2.4.0
> Reporter: Abhishek Dixit
> Priority: Major
>
> Using FileStreamSource to read files from a S3 bucket has problems both in
> terms of costs and latency:
> * *Latency:* Listing all the files in S3 buckets every microbatch can be
> both slow and resource intensive.
> * *Costs:* Making List API requests to S3 every microbatch can be costly.
> The solution is to use Amazon Simple Queue Service (SQS) which lets you find
> new files written to S3 bucket without the need to list all the files every
> microbatch.
> S3 buckets can be configured to send notification to an Amazon SQS Queue on
> Object Create / Object Delete events. For details see AWS documentation here
> [Configuring S3 Event
> Notifications|https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html]
>
> Spark can leverage this to find new files written to S3 bucket by reading
> notifications from SQS queue instead of listing files every microbatch.
> I hope to contribute changes proposed in [this pull
> request|https://github.com/apache/spark/pull/24934] to Apache Bahir as
> suggested by @[gaborgsomogyi|https://github.com/gaborgsomogyi]
> [here|https://github.com/apache/spark/pull/24934#issuecomment-511389130]
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)