Hi Cham, Thanks a lot for the clarifications!
(1) I wouldn't mind to use BoundedSource, it's just that my impression was that it was considered to be an anti-pattern. Seems like most of the logic will be left intact though, so shouldn't really be a problem. Is BoundedSource API going to stay mostly the same after the introduction of SplittableDoFn? (2) Makes sense, thanks! (3) Thanks for the tip! This makes me wonder if some sort of test-time fusion detector would be useful, which would take a pipeline, analyze it and print out which transformations may be fused by the runner. But maybe it's just easier to remember that ParDo steps tend to fuse. On a side note, as you may see, I've been putting the columns data into the PCollection (for example, here: https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72), but reading more code made me realize that I should have used side inputs for that instead? On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <[email protected]> wrote: > Hi Dmitry, > > Thanks for writing this. Some general comments. > > (1) Do you want to implement this using ParDos or using BoundedSource [1] > API. Using BoundedSource API has some benefits such as support for dynamic > work rebalancing (see [2]) though using ParDos will be more future-proof > (dynamic work rebalancing will be supported sometime in the future through > SplittableDoFn API [3][4]). > > (2) Seems like what Java BigQuery source is doing is, deleting the > temporary table at the location you mentioned, and deleting temporary > exported files by mapping the directory path to pipeline's temporary path > (which hopefully gets deleted by the runner). You should be able to utilize > a similar approach in Python SDK. You should not delete exported files in a > Pardo since a runner might rerun stages of a pipeline. > > (3) If you are using ParDo based approach, you should add a GroupByKey > between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you > ParDos might get fused to a single stage and you might end up reading all > the data from a single worker. > > Thanks, > Cham. > > [1] https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/io/iobase.py#L73 > [2] https://beam.apache.org/documentation/io/authoring- > overview/#read-transforms > [3] https://s.apache.org/splittable-do-fn > [4] https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH > 1YDmi-s_ozM/edit# > > > On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <[email protected]> > wrote: > >> Hi, list, >> >> I was hoping someone could give me a general code review on a Redshift >> source I wrote: https://gist.github.com/doubleyou/ >> d3236180691dc9b146e17bc046ec1fc1. It also relies on modules `s3` and >> `config` from our internal library, I can add them too if needed, it just >> was more hassle to open up the entire repository with the code, since it >> contains some company-specific code at the moment. >> >> My hope was also to find out if you wanted me to file a pull request, >> we'd be totally fine to open source this piece, as well as some other AWS >> sources and sinks in the future. >> >> Finally, I have a specific question about cleanup. My impression was that >> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1f >> c1#file-redshift-py-L153 would help making sure that there's no possible >> data loss after we delete the S3 files, however, in a personal conversation >> Eugene Kirpichev pointed out that this way does not ensure the PCollection >> persistence, and that Dataflow will just fuse multiple phases together. >> >> Also, Eugene pointed out that this cleanup problem has been worked around >> in the BigQuery source in Java SDK. To my understanding, it's this one: >> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff0 >> 36429429c1/sdks/java/io/google-cloud-platform/src/ >> main/java/org/apache/beam/sdk/io/gcp/bigquery/ >> BigQuerySourceBase.java#L100, however I don't yet have enough knowledge >> about the parity between Java and Python SDKs to tell whether I can or >> cannot implement a Python source in a similar fashion (from what I >> remember, implementing sources is generally frowned upon, as opposed to >> writing a DoFn instead). >> >> Any thoughts and suggestions would be highly appreciated. >> >> Thank you. >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > -- Best regards, Dmitry Demeshchuk.
