[jira] [Updated] (SPARK-27676) InMemoryFileIndex should hard-fail on missing files instead of logging and continuing

2019-05-10 Thread Josh Rosen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-27676:
---
Description: 
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings (during [directory 
listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
 and [block location 
lookup|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333]).
 I think that this is a dangerous default behavior and would prefer that Spark 
hard-fails by default (with the ignore-and-continue behavior guarded by a SQL 
session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is deleted at any time between the paths were resolved and the file 
catalog can check for the folder, the Spark job fails. This may abruptly stop 
long running StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These 
cases should not prevent jobs from successfully completing.
{quote}
Let's say that I'm *not* expecting to ever delete input files for my job. In 
that case, this behavior can mask bugs.

One straightforward masked bug class is accidental file deletion: if I'm never 
expecting to delete files then I'd prefer to fail my job if Spark sees deleted 
files.

A more subtle bug can occur when using a S3 filesystem. Say I'm running a Spark 
job against a partitioned Parquet dataset which is laid out like this:
{code:java}
data/
  date=1/
region=west/
   0.parquet
   1.parquet
region=east/
   0.parquet
   1.parquet{code}
If I do {{spark.read.parquet("/data/date=1/")}} then Spark needs to perform 
multiple rounds of file listing, first listing {{/data/date=1}} to discover the 
partitions for that date, then listing within each partition to discover the 
leaf files. Due to the eventual consistency of S3 ListObjects, it's possible 
that the first listing will show the {{region=west}} and {{region=east}} 
partitions existing and then the next-level listing fails to return any for 
some of the directories (e.g. {{/data/date=1/}} returns files but 
{{/data/date=1/region=west/}} throws a {{FileNotFoundException}} in S3A due to 
ListObjects inconsistency).

If Spark propagated the {{FileNotFoundException}} and hard-failed in this case 
then I'd be able to fail the job in this case where we _definitely_ know that 
the S3 listing is inconsistent (failing here doesn't guard against _all_ 
potential S3 list inconsistency issues (e.g. back-to-back listings which both 
return a subset of the true set of objects), but I think it's still an 
improvement to fail for the subset of cases that we _can_ detect even if that's 
not a surefire failsafe against the more general problem).

Finally, I'm unsure if the original patch will have the desired effect: if a 
file is deleted once a Spark job expects to read it then that can cause 
problems at multiple layers, both in the driver (multiple rounds of file 
listing) and in executors (if the deletion occurs after the construction of the 
catalog but before the scheduling of the read tasks); I think the original 
patch only resolved the problem for the driver (unless I'm missing similar 
executor-side code specific to the original streaming use-case).

Given all of these reasons, I think that the "ignore potentially deleted files 
during file index listing" behavior should be guarded behind a feature flag 
which defaults to {{false}}, consistent with the existing 
{{spark.files.ignoreMissingFiles}} and {{spark.sql.files.ignoreMissingFiles}} 
flags (which both default to false).

  was:
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings (during [directory 
listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
 and [block location 
lookup|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333]).
 I think that this is a dangerous default behavior and would prefer that Spark 
hard-fails by default (with the ignore-and-continue behavior guarded by a SQL 
session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is 

[jira] [Updated] (SPARK-27676) InMemoryFileIndex should hard-fail on missing files instead of logging and continuing

2019-05-10 Thread Josh Rosen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-27676:
---
Description: 
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings (during [directory 
listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
 and [block location 
lookup|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333]).
 I think that this is a dangerous default behavior and would prefer that Spark 
hard-fails by default (with the ignore-and-continue behavior guarded by a SQL 
session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is deleted at any time between the paths were resolved and the file 
catalog can check for the folder, the Spark job fails. This may abruptly stop 
long running StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These 
cases should not prevent jobs from successfully completing.
{quote}
Let's say that I'm *not* expecting to ever delete input files for my job. In 
that case, this behavior can mask bugs.

One straightforward masked bug class is accidental file deletion: if I'm never 
expecting to delete files then I'd prefer to fail my job if Spark sees deleted 
files.

A more subtle bug can occur when using a S3 filesystem. Say I'm running a Spark 
job against a partitioned Parquet dataset which is laid out like this:
{code:java}
data/
  date=1/
region=west/
   0.parquet
   1.parquet
region=east/
   0.parquet
   1.parquet{code}
If I do {{spark.read.parquet("/data/date=1/")}} then Spark needs to perform 
multiple rounds of file listing, first listing {{/data/date=1}} to discover the 
partitions for that date, then listing within each partition to discover the 
leaf files. Due to the eventual consistency of S3 ListObjects, it's possible 
that the first listing will show the {{region=west}} and {{region=east}} 
partitions existing and then the next-level listing fails to return any objects 
(e.g. {{/data/date=1/}} returns files but {{/data/date=1/region=west/}} throws 
a {{FileNotFoundException}} in S3A due to ListObjects inconsistency).

If Spark propagated the {{FileNotFoundException}} and hard-failed in this case 
then I'd be able to fail the job in this case where we _definitely_ know that 
the S3 listing is inconsistent (failing here doesn't guard against _all_ 
potential S3 list inconsistency issues (e.g. back-to-back listings which both 
return a subset of the true set of objects), but I think it's still an 
improvement to fail for the subset of cases that we _can_ detect even if that's 
not a surefire failsafe against the more general problem).

Finally, I'm unsure if the original patch will have the desired effect: if a 
file is deleted once a Spark job expects to read it then that can cause 
problems at multiple layers, both in the driver (multiple rounds of file 
listing) and in executors (if the deletion occurs after the construction of the 
catalog but before the scheduling of the read tasks); I think the original 
patch only resolved the problem for the driver (unless I'm missing similar 
executor-side code specific to the original streaming use-case).

Given all of these reasons, I think that the "ignore potentially deleted files 
during file index listing" behavior should be guarded behind a feature flag 
which defaults to {{false}}, consistent with the existing 
{{spark.files.ignoreMissingFiles}} and {{spark.sql.files.ignoreMissingFiles}} 
flags (which both default to false).

  was:
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings (during [directory 
listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
 and . I think that this is a dangerous default behavior and would prefer that 
Spark hard-fails by default (with the ignore-and-continue behavior guarded by a 
SQL session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is deleted at any time between the paths were resolved and the file 
catalog can check for the folder, the Spark job fails. This may abruptly stop 
long running StructuredStreaming jobs for example.

Folders may be deleted by 

[jira] [Updated] (SPARK-27676) InMemoryFileIndex should hard-fail on missing files instead of logging and continuing

2019-05-10 Thread Josh Rosen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-27676:
---
Description: 
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings (during [directory 
listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
 and . I think that this is a dangerous default behavior and would prefer that 
Spark hard-fails by default (with the ignore-and-continue behavior guarded by a 
SQL session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is deleted at any time between the paths were resolved and the file 
catalog can check for the folder, the Spark job fails. This may abruptly stop 
long running StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These 
cases should not prevent jobs from successfully completing.
{quote}
Let's say that I'm *not* expecting to ever delete input files for my job. In 
that case, this behavior can mask bugs.

One straightforward masked bug class is accidental file deletion: if I'm never 
expecting to delete files then I'd prefer to fail my job if Spark sees deleted 
files.

A more subtle bug can occur when using a S3 filesystem. Say I'm running a Spark 
job against a partitioned Parquet dataset which is laid out like this:
{code:java}
data/
  date=1/
region=west/
   0.parquet
   1.parquet
region=east/
   0.parquet
   1.parquet{code}
If I do {{spark.read.parquet("/data/date=1/")}} then Spark needs to perform 
multiple rounds of file listing, first listing {{/data/date=1}} to discover the 
partitions for that date, then listing within each partition to discover the 
leaf files. Due to the eventual consistency of S3 ListObjects, it's possible 
that the first listing will show the {{region=west}} and {{region=east}} 
partitions existing and then the next-level listing fails to return any objects 
(e.g. {{/data/date=1/}} returns files but {{/data/date=1/region=west/}} throws 
a {{FileNotFoundException}} in S3A due to ListObjects inconsistency).

If Spark propagated the {{FileNotFoundException}} and hard-failed in this case 
then I'd be able to fail the job in this case where we _definitely_ know that 
the S3 listing is inconsistent (failing here doesn't guard against _all_ 
potential S3 list inconsistency issues (e.g. back-to-back listings which both 
return a subset of the true set of objects), but I think it's still an 
improvement to fail for the subset of cases that we _can_ detect even if that's 
not a surefire failsafe against the more general problem).

Finally, I'm unsure if the original patch will have the desired effect: if a 
file is deleted once a Spark job expects to read it then that can cause 
problems at multiple layers, both in the driver (multiple rounds of file 
listing) and in executors (if the deletion occurs after the construction of the 
catalog but before the scheduling of the read tasks); I think the original 
patch only resolved the problem for the driver (unless I'm missing similar 
executor-side code specific to the original streaming use-case).

Given all of these reasons, I think that the "ignore potentially deleted files 
during file index listing" behavior should be guarded behind a feature flag 
which defaults to {{false}}, consistent with the existing 
{{spark.files.ignoreMissingFiles}} and {{spark.sql.files.ignoreMissingFiles}} 
flags (which both default to false).

  was:
Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
exceptions are caught and logged as warnings. I think that this is a dangerous 
default behavior and would prefer that Spark hard-fails by default (with the 
ignore-and-continue behavior guarded by a SQL session configuration).

In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
Quoting from the PR for SPARK-17599:
{quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. If 
a folder is deleted at any time between the paths were resolved and the file 
catalog can check for the folder, the Spark job fails. This may abruptly stop 
long running StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These 
cases should not prevent jobs from successfully completing.
{quote}
Let's say that I'm *not* expecting to ever delete input files for my job. In 
that case, this behavior can mask bugs.

One straightforward masked bug class is accidental file deletion: if I'm never 
expecting to delete files then I'd prefer to fail my job if Spark sees deleted 
files.

A