We're writing text (JSON fields, one per line). We've written the DoFn; but our issue is that dozens of instances per window are being generated for a relatively small amount of data. I haven't look into it, but it might be as many as one DoFn object per record.
Some psuedo code explaining how we're attempting to write to azure AzureWriter extends DoFn<String, Void>{ public AzureWriter(String azureFileName) processElement(){ //write each string to a temp file, keeping track of a unique hash of the file to use as a blockId } finishBundle(){ //Upload our temp file(s) as blocks to the block blob } } This scheme works great if each DoFn instance gets many elements (which is how I wrote the unit tests, I assumed this would be the case), however, that doesn't appear to be happening and I'm not sure how and if I can force apache beam to bundle more elements per DoFn instance. So I'm leaning toward option 3, where we would do a GroupBy before calling the DoFn to write to azure, which would change the signature from DoFn<String, Void> to DoFn<Iterable<String>, Void>. I don't know what the consequences of this would be if the amount of data were too large to fit on a single machine though, would different Iterable<String> objects be created to distribute the load? Can you give me a rough timeline on when the IOChannelFactory refactor will be in the snapshot build? Thanks for the reply. On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Parker, > > What's the format you want to write on Azure ? > > Right now, I would say the easiest way is to write your own DoFn that > writes on Azure. > > By the way, we are working on a complete IOChannelFactory refactoring with > Beam filesystems. It means that we will be able to write a Beam > AzureFileSystem with a corresponding scheme that you will be able to use > like (assuming you want to write a text format): > > pipeline.apply()...apply(TextIO.write().to("azure:/...") > > Regards > JB > > > On 04/19/2017 06:52 PM, Parker Coleman wrote: > >> Sorry if this is a double-post, I tried using the web client but didn't >> see the >> post show up, so instead I've subscribed to the list and am sending this >> via my >> email client. >> >> We're working on an apache beam pipeline that gathers messages from >> various >> pub/sub topics, decodes them, bundles them into groups by hour (using a >> FixedWindow) and sends them to Azure. The eventual format of the records >> is >> line-delimited JSON. >> >> There isn't a sink written for Azure yet, and we want the ability to >> control >> which file names get created for each window firing (we want the date/time >> appended to the file name, it looks like existing sinks don't give you >> much >> control over the filename outside of specifying sharding). >> >> We decided to try writing our data to Azure via a DoFn instead. For the >> processElement it writes a single record to a temp file. In the >> finishBundle >> step, we acquire a lease on a blockId based off the hash of our temp >> file, and >> upload the data. >> >> This would work fine if each instance of our DoFn were used for more than >> one >> record. However, we're running into the same issue from here: >> http://stackoverflow.com/questions/42914198/buffer-and-flush >> -apache-beam-streaming-data, >> each of our records is getting its own DoFn instance. >> >> So my question is: Whats the best way to proceed? I see a few options >> >> 1.) Try implementing the stateful processing/trigger mechanism suggested >> in the >> stackoverflow post. Use this state instead of the temp files and trigger >> writes >> via triggers >> 2.) Try implementing a FileBasedSink. I'd prefer the flexibility of the >> Sink >> abstract class, but looking at the Apache Beam Jira project, that might >> be going >> away soon (https://issues.apache.org/jira/browse/BEAM-1897). I'd like >> to stay >> away from that based on that uncertainty. >> >> I saw in this stack overflow post >> http://stackoverflow.com/questions/41386717/azure-blob-suppo >> rt-in-apache-beam >> that it might be nice to have a dedicated Azure Sink, if I could get a >> good >> rundown of what needs to be done, I might be able to contribute this. >> 3.) Do a GroupBy when our window fires. This way instead of having a >> DoFn<String>, it would be a DoFn<Iterable<String>>. I'm not sure what >> would >> happen if the result of the GroupBy couldn't fit in a single machine's >> memory >> though (say we uploaded a bunch of data during that 1 hour). Would the >> Iterable<String>'s records be split across multiple machines? If so, I >> like >> this solution best. >> 4.) Something else? >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >