What would be the benefit of that, compared to what TextIO already exposes?
On Thu, Sep 28, 2017, 8:20 AM Jacob Marble <[email protected]> wrote: > Looks good to me, should do exactly what I need. > > Any thoughts on making TextSource public? I would be happy to submit the PR > myself. > > On Wed, Sep 27, 2017 at 5:56 PM, Eugene Kirpichov < > [email protected]> wrote: > > > Try TextIO.readAll() that takes a PCollection of filepatterns as input; > and > > the PCollection can be produced by the UNLOAD step, emitting a single > > element (the filepattern) after the unload completes. It's available in > > Beam 2.2 which hasn't been released yet, but the release vote will likely > > start this week, and meanwhile you can run against a snapshot. > > > > On Wed, Sep 27, 2017 at 5:40 PM Jacob Marble <[email protected]> > wrote: > > > > > After playing with this for a day, I can't figure out how to make step > 2 > > > start *after* step 1 completes. > > > > > > The natural way to accomplish step 2 is TextIO.Read, which only accepts > > > PBegin as input, and has no side input option. Wrapping TextIO.Read in > an > > > adaptor doesn't help because the graph still must build with > > > TextIO.Read.expand(PBegin). > > > > > > I tried extending TextIO.Read directly, but that doesn't work because > an > > > AutoValue class can't extend another. > > > > > > I thought about using TextSource directly, but it's package-private. > > > > > > So my thought is that either (1) TextIO.Read should accept side input, > if > > > only to block execution until the side input is complete, or (2) make > > > TextSource should be public, probably the more natural of these two > > > options. For now, I'll have to go with (3) copy FileSource source code > > into > > > a new class that I can use directly. > > > > > > Does anyone have thoughts here? > > > > > > On Wed, Sep 27, 2017 at 9:25 AM, Jacob Marble <[email protected]> > > wrote: > > > > > > > Reuven, I think I found an example of the pattern you describe in > > > > JdbcIO.Read.expand(). Thanks for this. > > > > > > > > On Wed, Sep 27, 2017 at 9:13 AM, Reuven Lax <[email protected] > > > > > > wrote: > > > > > > > >> Create is essentially a BoundedSource under the covers. > > > >> > > > >> There are multiple ways to handle step 3. One is to produce a > > > >> PCollection<String> containing the filenames. You could then attach > a > > > Void > > > >> key (using WithKeys), GBK the filenames together and delete in the > > next > > > >> step. > > > >> > > > >> Reuven > > > >> > > > >> On Wed, Sep 27, 2017 at 9:04 AM, Jacob Marble <[email protected]> > > > >> wrote: > > > >> > > > >> > Thanks, Reuven, that makes sense for step 1. After sending my > > original > > > >> > message, I started down the path of BoundedSource, but I think > this > > > >> could > > > >> > be better. > > > >> > > > > >> > Do you know any trick for step 3? > > > >> > > > > >> > On Wed, Sep 27, 2017 at 8:58 AM, Reuven Lax > > <[email protected] > > > > > > > >> > wrote: > > > >> > > > > >> > > A common pattern is the following > > > >> > > > > > >> > > p.apply(Create.of((Void) null)) > > > >> > > .apply(MapElements.via((Void v) -> /* once operation */); > > > >> > > > > > >> > > Of course as is always the case with any Beam DoFn, your > operation > > > >> might > > > >> > be > > > >> > > executed multiple times (e.g. if something fails before the > runner > > > >> > commits > > > >> > > the fact that the operation has succeeded). You need to ensure > > that > > > >> the > > > >> > > operation is idempotent. > > > >> > > > > > >> > > Reuven > > > >> > > > > > >> > > On Wed, Sep 27, 2017 at 8:51 AM, Jacob Marble < > > [email protected]> > > > >> > wrote: > > > >> > > > > > >> > > > I have been thinking on a Redshift reader/writer, basically to > > > wrap > > > >> > > UNLOAD > > > >> > > > and COPY in a PTransform. For example, steps to UNLOAD into a > > > >> > > PCollection: > > > >> > > > > > > >> > > > 1) JDBC to Redshift - UNLOAD > > > >> > > > <http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html> > > TO > > > >> > > > 's3://bucket/tmp-prefix' > > > >> > > > 2) S3 to PCollection - work in progress < > > > >> https://github.com/Kochava/ > > > >> > > > beam-s3> > > > >> > > > 3) delete tmp files from S3 > > > >> > > > > > > >> > > > To implement steps 1 and 3, I can't see a way to perform a > task > > > >> exactly > > > >> > > > once, globally, in a PTransform. Sure, I could do those steps > in > > > >> main() > > > >> > > or > > > >> > > > even in a separate script, but the result isn't code that can > be > > > >> shared > > > >> > > and > > > >> > > > reused very well. > > > >> > > > > > > >> > > > Am I missing something? Seems like the kind of problem that I > > > >> shouldn't > > > >> > > be > > > >> > > > the first to encounter. > > > >> > > > > > > >> > > > Thanks, > > > >> > > > > > > >> > > > Jacob > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > Jacob > > > >> > > > > >> > > > > > > > > > > > > > > > > -- > > > > Jacob > > > > > > > > > > > > > > > > -- > > > Jacob > > > > > > > > > -- > Jacob >
