Thanks. One more clarification: I suppose currently you're using a batch pipeline, but suppose that new files were constantly appearing and suppose you *did* use AvroIO.read().watchForNewFiles(), i.e. there would be no such thing as "after pipeline completes" because it never completes. In that case, when would you want to update the metadata table?
On Fri, Aug 25, 2017 at 10:53 AM Steve Niemitz <[email protected]> wrote: > - Correct! And I'm excited for 2.2 to be released so I don't need my own > build :) > - "checkpoint" might have been a bad term to use. The schema (or at least > a derivative of it) is persisted to a metadata table which is consumed by > readers (outside of beam). The process is a little more complicated than > I'm describing but this is the gist of it. The important thing to note is > that it is dependent on the data read from the avro files in the first > step. > > Let me know if you want me to go into more detail on anything! > > On Fri, Aug 25, 2017 at 1:43 PM, Eugene Kirpichov < > [email protected]> wrote: > > > Thanks. A couple of questions: > > - For step 1 (read Avro files with unknown schema), I presume you're > using > > AvroIO.parseGenericRecords()? > > - For step 4: why do you need the checkpoint: is it because you actually > > want to continuously ingest new Avro files as they keep appearing? In > that > > case you might want to take advantage of > > https://github.com/apache/beam/pull/3725 when it's in. > > > > On Fri, Aug 25, 2017 at 10:39 AM Steve Niemitz <[email protected]> > > wrote: > > > > > Having the sink output something I think is a good option. > > > > > > My use case looks something like this: > > > 1) Read a bunch of avro files (all with the same schema, but the schema > > is > > > not known before hand) > > > 2) Use the avro schema + data to generate bigtable mutations > > > 3) write the mutations > > > 4) *once all files are processed and written *-> update a "checkpoint" > > > marker in another bigtable table, which also depends on the schema from > > > (1). > > > > > > I've hacked this up by making the flow in step 4 rely on an output from > > > step 2 that goes through a GroupBy to ensure that all records are at > > least > > > processed by step 2 before step 4 runs, but there's still a race > > condition > > > between the last record being emitted by step 2 and the write in step 3 > > > completing. > > > > > > If as you said, the sink emitted a record when it completed, that'd > solve > > > the race condition. > > > > > > In summary: right now the flow looks like this (terrible ASCII > attempt): > > > > > > Read Avro Files (extract schema + data) (1) > > > | > > > V > > > Generate mutations (2) > > > |----------------------------> [GroupBy -> Take first -> Generate > > mutation > > > -> Bigtable write] (4) > > > V > > > Write mutations (3) > > > > > > > > > On Fri, Aug 25, 2017 at 11:53 AM, Eugene Kirpichov < > > > [email protected]> wrote: > > > > > > > I'd like to know more about your both use cases, can you clarify? I > > think > > > > making sinks output something that can be waited on by another > pipeline > > > > step is a reasonable request, but more details would help refine this > > > > suggestion. > > > > > > > > On Fri, Aug 25, 2017, 8:46 AM Chamikara Jayalath < > [email protected] > > > > > > > wrote: > > > > > > > > > Can you do this from the program that runs the Beam job, after job > is > > > > > complete (you might have to use a blocking runner or poll for the > > > status > > > > of > > > > > the job) ? > > > > > > > > > > - Cham > > > > > > > > > > On Fri, Aug 25, 2017 at 8:44 AM Steve Niemitz <[email protected] > > > > > > wrote: > > > > > > > > > > > I also have a similar use case (but with BigTable) that I feel > > like I > > > > had > > > > > > to hack up to make work. It'd be great to hear if there is a way > > to > > > do > > > > > > something like this already, or if there are plans in the future. > > > > > > > > > > > > On Fri, Aug 25, 2017 at 9:46 AM, Chaim Turkel <[email protected]> > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > I have a few piplines that are an ETL from different systems > to > > > > > > bigquery. > > > > > > > I would like to write the status of the ETL after all records > > have > > > > > > > been updated to the bigquery. > > > > > > > The problem is that writing to bigquery is a sink and you > cannot > > > have > > > > > > > any other steps after the sink. > > > > > > > I tried a sideoutput, but this is called in no correlation to > the > > > > > > > writing to bigquery, so i don't know if it succeeded or failed. > > > > > > > > > > > > > > > > > > > > > any ideas? > > > > > > > chaim > > > > > > > > > > > > > > > > > > > > > > > > > > > >
