Thanks Ben, the Watch transform (https://github.com/apache/beam/pull/3565, in review) is implemented in a way forward-compatible with your ideas, though I didn't go all the way and implement them - I left a couple of TODOs.
In other good news - I have a PR in review for incrementally reading new files matching a filepattern in TextIO, based on Watch! The changes to TextIO to support this are essentially trivial, and are trivial to port to other file-based IOs. https://github.com/apache/beam/pull/3607 This is the first practical example of an SDF-based connector that users have been requesting for a long time. On Wed, Jul 12, 2017 at 9:03 AM Ben Chambers <[email protected]> wrote: > Regarding changing the coder -- keep in mind that there may be persisted > state somewhere, so we can't just change the coder once this is used. > > If the processing of scanning for modified and new files reported the > last-modified-time, could we use that and have the SDF report KV<filename, > timestamp> with the last-modified-time as the timestamp? If we could do > that, and there was a "watermark" that tracked the latest > last-modified-time that we have processed, then we can use per-key state to > store how far a given filename has been processed, but set an event time > timer to go off when the watermark indicates all files have been processed > up to that point. This would allow the state to be garbage collected. > > On Wed, Jul 12, 2017 at 7:50 AM Reuven Lax <[email protected]> > wrote: > > > Yes, you still need SDF to do the root expansion. However it means that > the > > state storage is now distributed. > > > > Garbage collection might be trickier with Distinct. > > > > On Tue, Jul 11, 2017 at 10:19 PM, Eugene Kirpichov < > > [email protected]> wrote: > > > > > Yes, I thought of this, but: > > > - The distinct transform needs to apply per input (probably easy) > > > - You still need an SDF to run the set expansion repeatedly > > > - It's not clear when to terminate the repeated expansion in this > > > implementation > > > > > > On Tue, Jul 11, 2017 at 10:14 PM Reuven Lax <[email protected]> > > > wrote: > > > > > > > As a thought experiment: could this be done by expanding the set > into a > > > > PCollection and running it through a Distinct (in the global window, > > > > trigger every element) transform? > > > > > > > > On Tue, Jul 11, 2017 at 9:48 PM, Eugene Kirpichov < > > > > [email protected]> wrote: > > > > > > > > > In the current version, the transform is intended to watch a set > that > > > is > > > > > continuously growing; do you mean a GCS bucket that eventually > > contains > > > > > more files than can fit in a state tag? > > > > > > > > > > I agree that this will eventually become an issue; I can see a > couple > > > of > > > > > solutions: > > > > > - I suspect many such sets are highly compressible, so we can use a > > > coder > > > > > that compresses things and get some headroom. > > > > > - When an element disappears from a set, we can remove it from the > > > state > > > > > (without emitting anything into the transform's output - just for > GC > > > > > purposes). Of course this assumes that elements actually disappear > > from > > > > the > > > > > set (e.g. get removed from the GCS bucket). > > > > > - There might be a way to shard the set using a GBK. I'm not quite > > sure > > > > how > > > > > it would look in the transform, in particular how the termination > > > > condition > > > > > would look like - because polling would need to happen before the > > GBK, > > > > and > > > > > termination conditions such as "no new elements observed" depend on > > > > > information in shards that's after the GBK. > > > > > > > > > > On Tue, Jul 11, 2017 at 9:23 PM Reuven Lax > <[email protected] > > > > > > > > wrote: > > > > > > > > > > > BTW - I am worried about SDF storing everything in a single tag > for > > > > > watch. > > > > > > The problem is that streaming pipeline can run "forever." So > > someone > > > > > > watching a GCS bucket "forever" will eventually crash due to the > > > value > > > > > > getting too large. Is there any reasonable way to garbage collect > > > this > > > > > > state? > > > > > > > > > > > > On Tue, Jul 11, 2017 at 9:08 PM, Eugene Kirpichov < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > First PR has been submitted - enjoy TextIO.readAll() which > reads > > a > > > > > > > PCollection of filenames! > > > > > > > I've started working on the SDF-based Watch transform > > > > > > > http://s.apache.org/beam-watch-transform, and after that will > be > > > > able > > > > > to > > > > > > > implement the incremental features in TextIO. > > > > > > > > > > > > > > On Tue, Jun 27, 2017 at 1:55 PM Eugene Kirpichov < > > > > [email protected] > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks all. The first PR is out for review: > > > > > > > > https://github.com/apache/beam/pull/3443 > > > > > > > > Next work (watching for new files) is in progress, based on > > > > > > > > https://github.com/apache/beam/pull/3360 > > > > > > > > > > > > > > > > On Tue, Jun 27, 2017 at 11:22 AM Kenneth Knowles > > > > > > <[email protected] > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> +1 > > > > > > > >> > > > > > > > >> This is a really nice doc and plan. > > > > > > > >> > > > > > > > >> On Tue, Jun 27, 2017 at 1:49 AM, Aljoscha Krettek < > > > > > > [email protected]> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >> > +1 > > > > > > > >> > > > > > > > > >> > This sounds very good and there is a clear implementation > > > path! > > > > > > > >> > > > > > > > > >> > > On 24. Jun 2017, at 20:55, Jean-Baptiste Onofré < > > > > > [email protected]> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > > >> > > Fair enough ;) > > > > > > > >> > > > > > > > > > >> > > Let me review the different Jira and provide some > > feedback. > > > > > > > >> > > > > > > > > > >> > > Regards > > > > > > > >> > > JB > > > > > > > >> > > > > > > > > > >> > > On Jun 24, 2017, 20:54, at 20:54, Eugene Kirpichov > > > > > > > >> > <[email protected]> wrote: > > > > > > > >> > >> Hi JB, > > > > > > > >> > >> I haven't yet thought about how this work can be > > > > parallelized. > > > > > > For > > > > > > > >> now > > > > > > > >> > >> I'd > > > > > > > >> > >> like to just get feedback on the approach :) > > > > > > > >> > >> But glad that you're willing to help out - let's > discuss > > > this > > > > > > too a > > > > > > > >> bit > > > > > > > >> > >> later! > > > > > > > >> > >> > > > > > > > >> > >> On Sat, Jun 24, 2017 at 11:51 AM Jean-Baptiste Onofré < > > > > > > > >> [email protected]> > > > > > > > >> > >> wrote: > > > > > > > >> > >> > > > > > > > >> > >>> Thanks Eugene > > > > > > > >> > >>> > > > > > > > >> > >>> I will pick up some. > > > > > > > >> > >>> > > > > > > > >> > >>> Regards > > > > > > > >> > >>> JB > > > > > > > >> > >>> > > > > > > > >> > >>> On Jun 24, 2017, 20:00, at 20:00, Eugene Kirpichov > > > > > > > >> > >>> <[email protected]> wrote: > > > > > > > >> > >>>> Filed JIRAs for the proposed features and linked with > > the > > > > > doc: > > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2511 > TextIO > > > > > should > > > > > > > >> > >> support > > > > > > > >> > >>>> reading a PCollection of filenames > > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2512 > TextIO > > > > > should > > > > > > > >> > >> support > > > > > > > >> > >>>> watching for new files > > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2513 > TextIO > > > > > should > > > > > > > >> > >> support > > > > > > > >> > >>>> watching files for new entries > > > > > > > >> > >>>> > > > > > > > >> > >>>> On Fri, Jun 23, 2017 at 4:32 PM Eugene Kirpichov > > > > > > > >> > >> <[email protected]> > > > > > > > >> > >>>> wrote: > > > > > > > >> > >>>> > > > > > > > >> > >>>>> Hi all, > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> I've written up a proposal for incrementally > > delivering > > > a > > > > > > bunch > > > > > > > of > > > > > > > >> > >>>> useful > > > > > > > >> > >>>>> new features in TextIO based on Splittable DoFn. > It's > > > > > > applicable > > > > > > > >> > >> to > > > > > > > >> > >>>> other > > > > > > > >> > >>>>> file-based connectors, TextIO is just one good > > example. > > > > Let > > > > > me > > > > > > > >> > >> know > > > > > > > >> > >>>> what > > > > > > > >> > >>>>> you think! > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> https://s.apache.org/textio-sdf > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> Copy of abstract: > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> Users have often expressed interest in several new > > > > features > > > > > > for > > > > > > > >> > >>>> reading > > > > > > > >> > >>>>> files - in particular, incremental reading of log > > files > > > > > > > (streaming > > > > > > > >> > >> of > > > > > > > >> > >>>> new > > > > > > > >> > >>>>> files matching a pattern and new entries in each > file) > > > and > > > > > > > reading > > > > > > > >> > >> a > > > > > > > >> > >>>>> PCollection of filenames (in particular, an > unbounded > > > > > > collection > > > > > > > >> > >>>> arriving > > > > > > > >> > >>>>> from a stream such as PubSub or Kafka). > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> Splittable DoFn < > http://s.apache.org/splittable-do-fn > > > > > > > > (SDF) > > > > > > > >> > >> enables > > > > > > > >> > >>>>> these features. This document proposes an API for > > them, > > > > > using > > > > > > > the > > > > > > > >> > >>>> example > > > > > > > >> > >>>>> of TextIO, and proposes and a plan for delivering > them > > > > > subject > > > > > > > to > > > > > > > >> > >>>>> availability of SDF in different runners. Some > > > > availability > > > > > > > >> > >>>> constraints are > > > > > > > >> > >>>>> circumvented by Running Splittable DoFn via Source > API > > > > > > > >> > >>>>> <http://s.apache.org/sdf-via-source>. > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> TL;DR Read a collection of filepatterns arriving on > > > PubSub > > > > > via > > > > > > > >> > >>>>> files.apply(TextIO.readEach()). Tail a filepattern > via > > > > > > > >> > >>>>> > > > > TextIO.read().watchForNewFiles().watchFilesForNewEntries(). > > > > > > > Coming > > > > > > > >> > >> to > > > > > > > >> > >>>> a > > > > > > > >> > >>>>> Beam SDK near you in small pieces. > > > > > > > >> > >>>>> > > > > > > > >> > >>>>> I think I'm gonna start working on the first steps > of > > > the > > > > > > > proposed > > > > > > > >> > >>>> plan, > > > > > > > >> > >>>>> in parallel with this discussion, because I'm > excited > > :) > > > > > > > >> > >>>>> > > > > > > > >> > >>> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
