We're writing text (JSON fields, one per line).

We've written the DoFn; but our issue is that dozens of instances per
window are being generated for a relatively small amount of data.  I
haven't look into it, but it might be as many as one DoFn object per record.

Some psuedo code explaining how we're attempting to write to azure

AzureWriter extends DoFn<String, Void>{

public AzureWriter(String azureFileName)

processElement(){
//write each string to a temp file, keeping track of a unique hash of the
file to use as a blockId
}

finishBundle(){
//Upload our temp file(s) as blocks to the block blob
}

}

This scheme works great if each DoFn instance gets many elements (which is
how I wrote the unit tests, I assumed this would be the case), however,
that doesn't appear to be happening and I'm not sure how and if I can force
apache beam to bundle more elements per DoFn instance.

So I'm leaning toward option 3, where we would do a GroupBy before calling
the DoFn to write to azure, which would change the signature from
DoFn<String, Void> to DoFn<Iterable<String>, Void>.  I don't know what the
consequences of this would be if the amount of data were too large to fit
on a single machine though, would different Iterable<String> objects be
created to distribute the load?

Can you give me a rough timeline on when the IOChannelFactory refactor will
be in the snapshot build?

Thanks for the reply.

On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Parker,
>
> What's the format you want to write on Azure ?
>
> Right now, I would say the easiest way is to write your own DoFn that
> writes on Azure.
>
> By the way, we are working on a complete IOChannelFactory refactoring with
> Beam filesystems. It means that we will be able to write a Beam
> AzureFileSystem with a corresponding scheme that you will be able to use
> like (assuming you want to write a text format):
>
> pipeline.apply()...apply(TextIO.write().to("azure:/...")
>
> Regards
> JB
>
>
> On 04/19/2017 06:52 PM, Parker Coleman wrote:
>
>> Sorry if this is a double-post, I tried using the web client but didn't
>> see the
>> post show up, so instead I've subscribed to the list and am sending this
>> via my
>> email client.
>>
>> We're working on an apache beam pipeline that gathers messages from
>> various
>> pub/sub topics, decodes them, bundles them into groups by hour (using a
>> FixedWindow) and sends them to Azure.  The eventual format of the records
>> is
>> line-delimited JSON.
>>
>> There isn't a sink written for Azure yet, and we want the ability to
>> control
>> which file names get created for each window firing (we want the date/time
>> appended to the file name, it looks like existing sinks don't give you
>> much
>> control over the filename outside of specifying sharding).
>>
>> We decided to try writing our data to Azure via a DoFn instead.  For the
>> processElement it writes a single record to a temp file.  In the
>> finishBundle
>> step, we acquire a lease on a blockId based off the hash of our temp
>> file, and
>> upload the data.
>>
>> This would work fine if each instance of our DoFn were used for more than
>> one
>> record.  However, we're running into the same issue from here:
>> http://stackoverflow.com/questions/42914198/buffer-and-flush
>> -apache-beam-streaming-data,
>> each of our records is getting its own DoFn instance.
>>
>> So my question is: Whats the best way to proceed?  I see a few options
>>
>> 1.) Try implementing the stateful processing/trigger mechanism suggested
>> in the
>> stackoverflow post.  Use this state instead of the temp files and trigger
>> writes
>> via triggers
>> 2.) Try implementing a FileBasedSink.  I'd prefer the flexibility of the
>> Sink
>> abstract class, but looking at the Apache Beam Jira project, that might
>> be going
>> away soon (https://issues.apache.org/jira/browse/BEAM-1897).  I'd like
>> to stay
>> away from that based on that uncertainty.
>>
>> I saw in this stack overflow post
>> http://stackoverflow.com/questions/41386717/azure-blob-suppo
>> rt-in-apache-beam
>> that it might be nice to have a dedicated Azure Sink, if I could get a
>> good
>> rundown of what needs to be done, I might be able to contribute this.
>> 3.) Do a GroupBy when our window fires.  This way instead of having a
>> DoFn<String>, it would be a DoFn<Iterable<String>>.  I'm not sure what
>> would
>> happen if the result of the GroupBy couldn't fit in a single machine's
>> memory
>> though (say we uploaded a bunch of data during that 1 hour).  Would the
>> Iterable<String>'s records be split across multiple machines?  If so, I
>> like
>> this solution best.
>> 4.) Something else?
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to