Thanks Ben, the Watch transform (https://github.com/apache/beam/pull/3565,
in review) is implemented in a way forward-compatible with your ideas,
though I didn't go all the way and implement them - I left a couple of
TODOs.

In other good news - I have a PR in review for incrementally reading new
files matching a filepattern in TextIO, based on Watch! The changes to
TextIO to support this are essentially trivial, and are trivial to port to
other file-based IOs.

https://github.com/apache/beam/pull/3607

This is the first practical example of an SDF-based connector that users
have been requesting for a long time.

On Wed, Jul 12, 2017 at 9:03 AM Ben Chambers <bchamb...@google.com.invalid>
wrote:

> Regarding changing the coder -- keep in mind that there may be persisted
> state somewhere, so we can't just change the coder once this is used.
>
> If the processing of scanning for modified and new files reported the
> last-modified-time, could we use that and have the SDF report KV<filename,
> timestamp> with the last-modified-time as the timestamp? If we could do
> that, and there was a "watermark" that tracked the latest
> last-modified-time that we have processed, then we can use per-key state to
> store how far a given filename has been processed, but set an event time
> timer to go off when the watermark indicates all files have been processed
> up to that point. This would allow the state to be garbage collected.
>
> On Wed, Jul 12, 2017 at 7:50 AM Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > Yes, you still need SDF to do the root expansion. However it means that
> the
> > state storage is now distributed.
> >
> > Garbage collection might be trickier with Distinct.
> >
> > On Tue, Jul 11, 2017 at 10:19 PM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Yes, I thought of this, but:
> > > - The distinct transform needs to apply per input (probably easy)
> > > - You still need an SDF to run the set expansion repeatedly
> > > - It's not clear when to terminate the repeated expansion in this
> > > implementation
> > >
> > > On Tue, Jul 11, 2017 at 10:14 PM Reuven Lax <re...@google.com.invalid>
> > > wrote:
> > >
> > > > As a thought experiment: could this be done by expanding the set
> into a
> > > > PCollection and running it through a Distinct (in the global window,
> > > > trigger every element) transform?
> > > >
> > > > On Tue, Jul 11, 2017 at 9:48 PM, Eugene Kirpichov <
> > > > kirpic...@google.com.invalid> wrote:
> > > >
> > > > > In the current version, the transform is intended to watch a set
> that
> > > is
> > > > > continuously growing; do you mean a GCS bucket that eventually
> > contains
> > > > > more files than can fit in a state tag?
> > > > >
> > > > > I agree that this will eventually become an issue; I can see a
> couple
> > > of
> > > > > solutions:
> > > > > - I suspect many such sets are highly compressible, so we can use a
> > > coder
> > > > > that compresses things and get some headroom.
> > > > > - When an element disappears from a set, we can remove it from the
> > > state
> > > > > (without emitting anything into the transform's output - just for
> GC
> > > > > purposes). Of course this assumes that elements actually disappear
> > from
> > > > the
> > > > > set (e.g. get removed from the GCS bucket).
> > > > > - There might be a way to shard the set using a GBK. I'm not quite
> > sure
> > > > how
> > > > > it would look in the transform, in particular how the termination
> > > > condition
> > > > > would look like - because polling would need to happen before the
> > GBK,
> > > > and
> > > > > termination conditions such as "no new elements observed" depend on
> > > > > information in shards that's after the GBK.
> > > > >
> > > > > On Tue, Jul 11, 2017 at 9:23 PM Reuven Lax
> <re...@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > BTW - I am worried about SDF storing everything in a single tag
> for
> > > > > watch.
> > > > > > The problem is that streaming pipeline can run "forever." So
> > someone
> > > > > > watching a GCS bucket "forever" will eventually crash due to the
> > > value
> > > > > > getting too large. Is there any reasonable way to garbage collect
> > > this
> > > > > > state?
> > > > > >
> > > > > > On Tue, Jul 11, 2017 at 9:08 PM, Eugene Kirpichov <
> > > > > > kirpic...@google.com.invalid> wrote:
> > > > > >
> > > > > > > First PR has been submitted - enjoy TextIO.readAll() which
> reads
> > a
> > > > > > > PCollection of filenames!
> > > > > > > I've started working on the SDF-based Watch transform
> > > > > > > http://s.apache.org/beam-watch-transform, and after that will
> be
> > > > able
> > > > > to
> > > > > > > implement the incremental features in TextIO.
> > > > > > >
> > > > > > > On Tue, Jun 27, 2017 at 1:55 PM Eugene Kirpichov <
> > > > kirpic...@google.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks all. The first PR is out for review:
> > > > > > > > https://github.com/apache/beam/pull/3443
> > > > > > > > Next work (watching for new files) is in progress, based on
> > > > > > > > https://github.com/apache/beam/pull/3360
> > > > > > > >
> > > > > > > > On Tue, Jun 27, 2017 at 11:22 AM Kenneth Knowles
> > > > > > <k...@google.com.invalid
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> +1
> > > > > > > >>
> > > > > > > >> This is a really nice doc and plan.
> > > > > > > >>
> > > > > > > >> On Tue, Jun 27, 2017 at 1:49 AM, Aljoscha Krettek <
> > > > > > aljos...@apache.org>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > +1
> > > > > > > >> >
> > > > > > > >> > This sounds very good and there is a clear implementation
> > > path!
> > > > > > > >> >
> > > > > > > >> > > On 24. Jun 2017, at 20:55, Jean-Baptiste Onofré <
> > > > > j...@nanthrax.net>
> > > > > > > >> wrote:
> > > > > > > >> > >
> > > > > > > >> > > Fair enough ;)
> > > > > > > >> > >
> > > > > > > >> > > Let me review the different Jira and provide some
> > feedback.
> > > > > > > >> > >
> > > > > > > >> > > Regards
> > > > > > > >> > > JB
> > > > > > > >> > >
> > > > > > > >> > > On Jun 24, 2017, 20:54, at 20:54, Eugene Kirpichov
> > > > > > > >> > <kirpic...@google.com.INVALID> wrote:
> > > > > > > >> > >> Hi JB,
> > > > > > > >> > >> I haven't yet thought about how this work can be
> > > > parallelized.
> > > > > > For
> > > > > > > >> now
> > > > > > > >> > >> I'd
> > > > > > > >> > >> like to just get feedback on the approach :)
> > > > > > > >> > >> But glad that you're willing to help out - let's
> discuss
> > > this
> > > > > > too a
> > > > > > > >> bit
> > > > > > > >> > >> later!
> > > > > > > >> > >>
> > > > > > > >> > >> On Sat, Jun 24, 2017 at 11:51 AM Jean-Baptiste Onofré <
> > > > > > > >> j...@nanthrax.net>
> > > > > > > >> > >> wrote:
> > > > > > > >> > >>
> > > > > > > >> > >>> Thanks Eugene
> > > > > > > >> > >>>
> > > > > > > >> > >>> I will pick up some.
> > > > > > > >> > >>>
> > > > > > > >> > >>> Regards
> > > > > > > >> > >>> JB
> > > > > > > >> > >>>
> > > > > > > >> > >>> On Jun 24, 2017, 20:00, at 20:00, Eugene Kirpichov
> > > > > > > >> > >>> <kirpic...@google.com.INVALID> wrote:
> > > > > > > >> > >>>> Filed JIRAs for the proposed features and linked with
> > the
> > > > > doc:
> > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2511
> TextIO
> > > > > should
> > > > > > > >> > >> support
> > > > > > > >> > >>>> reading a PCollection of filenames
> > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2512
> TextIO
> > > > > should
> > > > > > > >> > >> support
> > > > > > > >> > >>>> watching for new files
> > > > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2513
> TextIO
> > > > > should
> > > > > > > >> > >> support
> > > > > > > >> > >>>> watching files for new entries
> > > > > > > >> > >>>>
> > > > > > > >> > >>>> On Fri, Jun 23, 2017 at 4:32 PM Eugene Kirpichov
> > > > > > > >> > >> <kirpic...@google.com>
> > > > > > > >> > >>>> wrote:
> > > > > > > >> > >>>>
> > > > > > > >> > >>>>> Hi all,
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> I've written up a proposal for incrementally
> > delivering
> > > a
> > > > > > bunch
> > > > > > > of
> > > > > > > >> > >>>> useful
> > > > > > > >> > >>>>> new features in TextIO based on Splittable DoFn.
> It's
> > > > > > applicable
> > > > > > > >> > >> to
> > > > > > > >> > >>>> other
> > > > > > > >> > >>>>> file-based connectors, TextIO is just one good
> > example.
> > > > Let
> > > > > me
> > > > > > > >> > >> know
> > > > > > > >> > >>>> what
> > > > > > > >> > >>>>> you think!
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> https://s.apache.org/textio-sdf
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> Copy of abstract:
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> Users have often expressed interest in several new
> > > > features
> > > > > > for
> > > > > > > >> > >>>> reading
> > > > > > > >> > >>>>> files - in particular, incremental reading of log
> > files
> > > > > > > (streaming
> > > > > > > >> > >> of
> > > > > > > >> > >>>> new
> > > > > > > >> > >>>>> files matching a pattern and new entries in each
> file)
> > > and
> > > > > > > reading
> > > > > > > >> > >> a
> > > > > > > >> > >>>>> PCollection of filenames (in particular, an
> unbounded
> > > > > > collection
> > > > > > > >> > >>>> arriving
> > > > > > > >> > >>>>> from a stream such as PubSub or Kafka).
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> Splittable DoFn <
> http://s.apache.org/splittable-do-fn
> > >
> > > > > (SDF)
> > > > > > > >> > >> enables
> > > > > > > >> > >>>>> these features. This document proposes an API for
> > them,
> > > > > using
> > > > > > > the
> > > > > > > >> > >>>> example
> > > > > > > >> > >>>>> of TextIO, and proposes and a plan for delivering
> them
> > > > > subject
> > > > > > > to
> > > > > > > >> > >>>>> availability of SDF in different runners. Some
> > > > availability
> > > > > > > >> > >>>> constraints are
> > > > > > > >> > >>>>> circumvented by Running Splittable DoFn via Source
> API
> > > > > > > >> > >>>>> <http://s.apache.org/sdf-via-source>.
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> TL;DR Read a collection of filepatterns arriving on
> > > PubSub
> > > > > via
> > > > > > > >> > >>>>> files.apply(TextIO.readEach()). Tail a filepattern
> via
> > > > > > > >> > >>>>>
> > > > TextIO.read().watchForNewFiles().watchFilesForNewEntries().
> > > > > > > Coming
> > > > > > > >> > >> to
> > > > > > > >> > >>>> a
> > > > > > > >> > >>>>> Beam SDK near you in small pieces.
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>>> I think I'm gonna start working on the first steps
> of
> > > the
> > > > > > > proposed
> > > > > > > >> > >>>> plan,
> > > > > > > >> > >>>>> in parallel with this discussion, because I'm
> excited
> > :)
> > > > > > > >> > >>>>>
> > > > > > > >> > >>>
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to