Looks good to me, should do exactly what I need. Any thoughts on making TextSource public? I would be happy to submit the PR myself.
On Wed, Sep 27, 2017 at 5:56 PM, Eugene Kirpichov < [email protected]> wrote: > Try TextIO.readAll() that takes a PCollection of filepatterns as input; and > the PCollection can be produced by the UNLOAD step, emitting a single > element (the filepattern) after the unload completes. It's available in > Beam 2.2 which hasn't been released yet, but the release vote will likely > start this week, and meanwhile you can run against a snapshot. > > On Wed, Sep 27, 2017 at 5:40 PM Jacob Marble <[email protected]> wrote: > > > After playing with this for a day, I can't figure out how to make step 2 > > start *after* step 1 completes. > > > > The natural way to accomplish step 2 is TextIO.Read, which only accepts > > PBegin as input, and has no side input option. Wrapping TextIO.Read in an > > adaptor doesn't help because the graph still must build with > > TextIO.Read.expand(PBegin). > > > > I tried extending TextIO.Read directly, but that doesn't work because an > > AutoValue class can't extend another. > > > > I thought about using TextSource directly, but it's package-private. > > > > So my thought is that either (1) TextIO.Read should accept side input, if > > only to block execution until the side input is complete, or (2) make > > TextSource should be public, probably the more natural of these two > > options. For now, I'll have to go with (3) copy FileSource source code > into > > a new class that I can use directly. > > > > Does anyone have thoughts here? > > > > On Wed, Sep 27, 2017 at 9:25 AM, Jacob Marble <[email protected]> > wrote: > > > > > Reuven, I think I found an example of the pattern you describe in > > > JdbcIO.Read.expand(). Thanks for this. > > > > > > On Wed, Sep 27, 2017 at 9:13 AM, Reuven Lax <[email protected]> > > > wrote: > > > > > >> Create is essentially a BoundedSource under the covers. > > >> > > >> There are multiple ways to handle step 3. One is to produce a > > >> PCollection<String> containing the filenames. You could then attach a > > Void > > >> key (using WithKeys), GBK the filenames together and delete in the > next > > >> step. > > >> > > >> Reuven > > >> > > >> On Wed, Sep 27, 2017 at 9:04 AM, Jacob Marble <[email protected]> > > >> wrote: > > >> > > >> > Thanks, Reuven, that makes sense for step 1. After sending my > original > > >> > message, I started down the path of BoundedSource, but I think this > > >> could > > >> > be better. > > >> > > > >> > Do you know any trick for step 3? > > >> > > > >> > On Wed, Sep 27, 2017 at 8:58 AM, Reuven Lax > <[email protected] > > > > > >> > wrote: > > >> > > > >> > > A common pattern is the following > > >> > > > > >> > > p.apply(Create.of((Void) null)) > > >> > > .apply(MapElements.via((Void v) -> /* once operation */); > > >> > > > > >> > > Of course as is always the case with any Beam DoFn, your operation > > >> might > > >> > be > > >> > > executed multiple times (e.g. if something fails before the runner > > >> > commits > > >> > > the fact that the operation has succeeded). You need to ensure > that > > >> the > > >> > > operation is idempotent. > > >> > > > > >> > > Reuven > > >> > > > > >> > > On Wed, Sep 27, 2017 at 8:51 AM, Jacob Marble < > [email protected]> > > >> > wrote: > > >> > > > > >> > > > I have been thinking on a Redshift reader/writer, basically to > > wrap > > >> > > UNLOAD > > >> > > > and COPY in a PTransform. For example, steps to UNLOAD into a > > >> > > PCollection: > > >> > > > > > >> > > > 1) JDBC to Redshift - UNLOAD > > >> > > > <http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html> > TO > > >> > > > 's3://bucket/tmp-prefix' > > >> > > > 2) S3 to PCollection - work in progress < > > >> https://github.com/Kochava/ > > >> > > > beam-s3> > > >> > > > 3) delete tmp files from S3 > > >> > > > > > >> > > > To implement steps 1 and 3, I can't see a way to perform a task > > >> exactly > > >> > > > once, globally, in a PTransform. Sure, I could do those steps in > > >> main() > > >> > > or > > >> > > > even in a separate script, but the result isn't code that can be > > >> shared > > >> > > and > > >> > > > reused very well. > > >> > > > > > >> > > > Am I missing something? Seems like the kind of problem that I > > >> shouldn't > > >> > > be > > >> > > > the first to encounter. > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Jacob > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > -- > > >> > Jacob > > >> > > > >> > > > > > > > > > > > > -- > > > Jacob > > > > > > > > > > > -- > > Jacob > > > -- Jacob
