Sure thing Sumo.  Also if you're interested there are some really well
thought out and articulated 'enterprise integration patterns' that
much of this stuff aligns to.  This correlation concept I'm referring
to relates to this
http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html

Thanks
Joe

On Fri, Jun 17, 2016 at 12:28 PM, Sumanth Chinthagunta
<xmlk...@gmail.com> wrote:
> Hi Joe,
> Thanks a lot helping with the solution. I don’t  understand  before on how 
> correlation-identifier works.
> I guess MergeContent may be pulling flowFiles with same  
> correlation-identifier from queue in a batch.
>
> I don’t really need 1_N in the file name , this solution should work for my 
> case. I will try and let you know.
> Thanks
> -Sumo
>
>> On Jun 17, 2016, at 6:27 AM, Joe Witt <joe.w...@gmail.com> wrote:
>>
>> Sumo,
>>
>> Should be doable.  The only part that may be tricky is the filename showing 
>> 1_N if that means the whole thing has to retain sequential ordering from 
>> source through destination.  Otherwise...
>>
>> When merging flowfiles together you need to decide 'how should content be 
>> merged' and 'how should attributes be merged'.  The properties to control 
>> that are 'Merge Strategy' and 'Attribute Strategy' respectively.  For merge 
>> strategy you'll want to do binary merge.  For the attribute strategy the 
>> default of keeping only common attributes should likely be sufficient.  The 
>> reason it should be is the information that you'll need for writing to HDFS 
>> then is the common databaseName, tableName, and action.  When merging you'll 
>> merge by all three of these attributes combined.  You can do this by 
>> creating an attribute that combines those three things right after your 
>> extract attributes processor.
>>
>> Lets say your extract attributes pulls out 'databaseName', 'tableName' and 
>> 'action'.  If so then put an UpdateAttributes between your extract 
>> attributes and MergeContent (or you could use HashAttribute as well).  In 
>> this create an attribute called 'correlatation-identifier' and give it a 
>> value of ${databaseName}-${tableName}-${action}
>>
>> Then in merge content use that correlation-identifier attribute in the 
>> 'Correlation Attribute Name' property.
>>
>> Now, given that you'll be smashing JSON documents together keep in mind the 
>> resulting smashed together thing would not be valid JSON itself.  You'd need 
>> to either make sure when it is merged that the resulting output is also 
>> valid JSON which you can do by using MergeContent's header/footer/demarcator 
>> feature.  Or, you need the thing that reads these merged JSON documents to 
>> be able to demarcate them for you.
>>
>> If you want to end up with roughly 64MB bundles and these objects can be 
>> quite small (between say 1 and 10KB) then you'd be bundling around 
>> 6000-10000 objects each time and that is not factoring in compression.  I'd 
>> recommend a two phase merge with a GZIP compression step in between then.  
>> GZIP is nice as it compresses quite fast and it can be safely concatenated.  
>> So the 'merge step' would really be:
>> - First Merge
>> - GZIP Compress
>> - Final Merge
>>
>> In first merge do bundles of at least 800 objects but no more than 1000 and 
>> set an age kick-out of say 1 minute or whatever is appropriate in your case
>> In GZIP compress set level 1
>> In final merge do bundles of at least 55MB but no more than 64MB with an age 
>> kick-out of say 5 minutes or whatever is appropriate in your case
>>
>> Since the common attributes you needed will be retained in this model you 
>> will be able to write to hdfs using a path of something like 
>> '/${databaseName}/${tableName}/${action}/${uuid}.whatever.
>>
>> Now that I got here I just noticed you set 'tar' so presumably you are using 
>> tar merging strategy and most likely this is to address how to keep these 
>> objects separate and avoid the need for header/foot/demarcator/etc..  Good 
>> choice as well.
>>
>> There are a lot of ways to slice this up.
>>
>> Thanks
>> Joe
>>
>> On Wed, Jun 15, 2016 at 6:04 PM, Sumanth Chinthagunta <xmlk...@gmail.com 
>> <mailto:xmlk...@gmail.com>> wrote:
>>
>> Hi,
>> I have following flow that receives JSON data from Kafka and writes to HDFS.
>> Each flowFile received from Kafka has following attributes and JSON payload.
>> 1.      databaseName = db1 or db2 etc
>> 2.      tableName = customer or address etc
>> 3.      action = [insert, update, delete]
>>
>> My goal is to merge 1000 flowFlies into single file and write to HDFS 
>> (because writing large files into HDFS is more efficient then writing small 
>> JSON files.)
>> I also want to write into HDFS folder structure   like:
>> /<databaseName>/<tableName>/<action>/1_1000.tar
>> /<databaseName>/<tableName>/<action>/1000_2000.tar
>>
>> With default MergeContent configuration, I am losing individual flowFile’s 
>> attributes and cannot organize bin files into directory structure. Is it 
>> possible to accomplish my goal with   MergeContent?
>>
>>
>> Thanks
>> -Sumo
>>
>

Reply via email to