[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16279443#comment-16279443
 ] 

ASF GitHub Bot commented on BEAM-3030:
--

asfgit closed pull request #4190: [BEAM-3030] Adds a deduplication key to 
Watch, and uses it to handle growing files in FileIO.match
URL: https://github.com/apache/beam/pull/4190
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index a244c070129..4e7124af8a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -33,13 +33,17 @@
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -69,6 +73,11 @@
* By default, a filepattern matching no resources is treated according 
to {@link
* EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
* Match#withEmptyMatchTreatment}.
+   *
+   * Returned {@link MatchResult.Metadata} are deduplicated by filename. 
For example, if this
+   * transform observes a file with the same name several times with different 
metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time 
this file is observed,
+   * and will ignore future changes to this file.
*/
   public static Match match() {
 return new AutoValue_FileIO_Match.Builder()
@@ -317,13 +326,17 @@ public MatchAll continuously(
 "Match filepatterns",
 ParDo.of(new 
MatchFn(getConfiguration().getEmptyMatchTreatment(;
   } else {
-res = input
-.apply(
-"Continuously match filepatterns",
-Watch.growthOf(new MatchPollFn())
-.withPollInterval(getConfiguration().getWatchInterval())
-
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
-.apply(Values.create());
+res =
+input
+.apply(
+"Continuously match filepatterns",
+Watch.growthOf(
+Contextful.>of(
+new MatchPollFn(), Requirements.empty()),
+new ExtractFilenameFn())
+
.withPollInterval(getConfiguration().getWatchInterval())
+
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+.apply(Values.create());
   }
   return res.apply(Reshuffle.viaRandomKey());
 }
@@ -346,7 +359,7 @@ public void process(ProcessContext c) throws Exception {
   }
 }
 
-private static class MatchPollFn extends Watch.Growth.PollFn {
+private static class MatchPollFn extends PollFn {
   @Override
   public Watch.Growth.PollResult apply(String 
element, Context c)
   throws Exception {
@@ -354,6 +367,14 @@ public void process(ProcessContext c) throws Exception {
 Instant.now(), FileSystems.match(element, 
EmptyMatchTreatment.ALLOW).metadata());
   }
 }
+
+private static class ExtractFilenameFn
+implements SerializableFunction {
+  @Override
+  public String apply(MatchResult.Metadata input) {
+return input.resourceId().toString();
+  }
+}
   }
 
   /** Implementation of {@link #readMatches}. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 75c2fe45b80..4b31ae71333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ 

[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269863#comment-16269863
 ] 

Eugene Kirpichov commented on BEAM-3030:


Fix in https://github.com/apache/beam/pull/4190

> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-11-28 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269802#comment-16269802
 ] 

Eugene Kirpichov commented on BEAM-3030:


This also happens in FileIOTest: 
https://builds.apache.org/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-direct-java/5317/testReport/junit/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/

> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197478#comment-16197478
 ] 

ASF GitHub Bot commented on BEAM-3030:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3957


> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16195328#comment-16195328
 ] 

ASF GitHub Bot commented on BEAM-3030:
--

GitHub user jkff opened a pull request:

https://github.com/apache/beam/pull/3957

Fixes TextIO and AvroIO tests of watchForNewFiles

* AvroIO: Need to specify a trigger to make sure that files are really 
generated continuously and testing of watchForNewFiles is non-vacuous.

* TextIO: files were generated by manual code, and sometimes writing of a 
file could race with TextIO reading it, and it might see the same file with two 
different sizes, and count it as two different files (two Metadata objects for 
the same filename with different sizes are not equal) and read the file twice.

It makes sense to address that separately: e.g. in the Watch transform 
allow specifying a key extractor - but it's outside the scope of this PR and 
tracked in https://issues.apache.org/jira/browse/BEAM-3030.

R: @reuvenlax 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkff/incubator-beam read-watch-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3957.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3957


commit 59b450d82917707a0802c60cf910c998215cbca4
Author: Eugene Kirpichov 
Date:   2017-10-06T20:29:10Z

Fixes TextIO and AvroIO tests of watchForNewFiles

* AvroIO: Need to specify a trigger to make sure that files are really
generated continuously and testing of watchForNewFiles is non-vacuous.

* TextIO: files were generated by manual code,
and sometimes writing of a file could race with TextIO reading it, and
it might see the same file with two different sizes, and count it as two
different files (two Metadata objects for the same filename with
different sizes are not equal) and read the file twice.

It makes sense to address that separately: e.g. in the Watch transform
allow specifying a key extractor - but it's outside the scope of this
PR.




> watchForNewFiles() can emit a file multiple times if it's growing
> -
>
> Key: BEAM-3030
> URL: https://issues.apache.org/jira/browse/BEAM-3030
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as 
> FileIO.match().continuously(), use Watch transform under the hood, and watch 
> the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered 
> equal, so if these transforms observe the same file multiple times with 
> different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features 
> require SDF, it's supported only in Dataflow runner, and users of the 
> Dataflow runner are likely to use only files on GCS which doesn't support 
> appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)