Enforcing order is really tricky with Kafka.   The only way to enforce
order is to reduce the # of nodes processing.  You can have one NiFi master
node read from Kafka and have it distribute the workload to other NiFi
nodes and force ordering.  Or you may want to batch them up into say 10-15
minute chunks.   Or you could use a staging table.

You could also have something mark the order to make sure they run in
order.   I am not sure if Golden Gate can annotate them.   I think there is
a Kafka # that could help.


On Mon, Nov 12, 2018 at 12:16 PM Boris Tyukin <bo...@boristyukin.com> wrote:

> Faisal, BTW I stumbled upon this doc, that explains how HBase GoldenGate
> handler works in a similar scenario you've described:
>
> https://docs.oracle.com/goldengate/bd123210/gg-bd/GADBD/using-hbase-handler.htm#GADBD-GUID-1A9BA580-628B-48BD-9DC0-C3DF9722E0FB
>
> They provide an option to generate timestamp for hbase on a client side -
> which is what I suggested earlier. In your case, you would need to build
> this logic in NiFi. Still think op_ts,pos combo should give you a proper
> ordering of events (so events sorted by op_ts and then by pos). When you
> can come up with a rule to increment actual timestamp for hbase by a
> millisecond, like Oracle does with their Hbase handler.
>
> Really interested what you end up doing, please share once you come up
> with a solution.
>
> Boris
>
> On Wed, Nov 7, 2018 at 7:53 AM Boris <boris...@gmail.com> wrote:
>
>> Sorry I meant RBA.GG has a bunch of tokens you can add to your json file
>> - you can even create your own. POS should be good and if op_ts does not
>> work for you, why not to generate your own timestamp using POS? (Now()
>> expression). You also add another token that identifies transaction
>> sequence number and order opts and then by transaction sequence number.
>> Please share what you will end up doing
>>
>> On Tue, Nov 6, 2018, 01:55 Faisal Durrani <te04.0...@gmail.com wrote:
>>
>>> Hi Boris,
>>>
>>> Thank you for your reply.  Let me try explaining my data flow in detail.
>>> I am receiving the GG transaction as JSON format through Kafka so I can
>>> only use the fields provided by the Kafka handler of GG ( Json plug-gable
>>> format). I think you meant RBA value instead of rbc. I don't think we can
>>> receive the RBA value in Json but there is a field called POS which is a
>>> concatenation of source trail file number and RBA. So probably we can use
>>> that in the Enforce order processor. But if we don't use the timestamp
>>> information then we will run into the Hbase versioning issue.  The idea
>>> behind using the Op_ts was to version each row of our target table and also
>>> help us with the DML operation. We are using the PK of each table as the
>>> row_key of target Hbase table. Every new transaction(updated/delete) of the
>>> table is logically inserted as a new row but since its the same pkey so we
>>> can see the version each row. The operation with the highest timestamp is
>>> the valid state of the row. I tested the enforce order processor with the
>>> kafka offset and it skips all the records which arrive later then the older
>>> offset which i don't understand why. If i decide to use the enforce order
>>> on POS and use default timestamp in hbase then it will skip ordering the
>>> the kafka messages arriving late and that will cause the unsync. In
>>> addition to this I've read the Enforce order only orders the row on a
>>> single node while we have a 5 node cluster. So I'm not sure how do i
>>> combine all the flow files together on a single node? ( I know how to
>>> distribute them i.e is by using S2S-RPG)
>>>
>>> I hope i have been able to explain my situation. Kindly let me know of
>>> your views on this.
>>>
>>> Regards,
>>> Faisal
>>>
>>>
>>> On Mon, Nov 5, 2018 at 11:18 PM Boris Tyukin <bo...@boristyukin.com>
>>> wrote:
>>>
>>>> Hi Faisal, I am not Timothy, but you raise an interesting problem we
>>>> might face soon as well. I did not expect the situation you described and I
>>>> thought transaction time would be different.
>>>>
>>>> Our intent was to use op_ts to enforce order but another option is to
>>>> use GG rbc value or  oracle rowscn value  - did you consider them? GG
>>>> RBC should identify unique transaction and within every transaction, you
>>>> can also get operation# within a transaction. Also you can get trail file#
>>>> and trail file position. GG is really powerful and gives you a bunch of
>>>> data elements that you can enable on your message.
>>>>
>>>>
>>>> https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_fileformats.htm#GWUAD735
>>>>
>>>> Logdump tool is an awesome tool to look into your trail files and see
>>>> what's in there.
>>>>
>>>> Boris
>>>>
>>>>
>>>>
>>>> On Mon, Nov 5, 2018 at 3:07 AM Faisal Durrani <te04.0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Timothy ,
>>>>>
>>>>> Hope you are doing well. We have been using your data flow(
>>>>> https://community.hortonworks.com/content/kbentry/155527/ingesting-golden-gate-records-from-apache-kafka-an.html#
>>>>> )
>>>>> with slight modifications to store the data in Hbase. To version the
>>>>> rows we have been using Op_ts of golden gate json . But now we have found
>>>>> that multiple transactions can have the same Op_ts.  e.g. both update or
>>>>> delete can have the same Op_ts and if they arrive out of order to the
>>>>> PutHbaseJson processor then it can cause the target table to go out of
>>>>> sync. I am using the a cluster of nifi nodes so i cannot use Enforceorder
>>>>> processor to order the kafka messages as i understand it only order the
>>>>> flow files on a single node only and not across the cluster. Additionally
>>>>> we have a separate topic for each table and we have several consumer
>>>>> groups. I tried using the Current_ts column of the golden gate message but
>>>>> then if GG abends and restart the replication it will send the past data
>>>>> with the newer current_ts which will also cause the un-sync. I was
>>>>> wondering if you can give any idea so that we can order our transaction
>>>>> correctly.
>>>>>
>>>>> Regards,
>>>>> Faisal
>>>>>
>>>>

Reply via email to