Unsubscribe 

Sent from Tianchu(Alex) iPhone

On Dec 4, 2018, at 00:00, Nirmal Manoharan <nirmalmano....@gmail.com> wrote:

I am trying to deduplicate on streaming data using the dropDuplicate function 
with watermark. The problem I am facing currently is that I have to two 
timestamps for a given record
1. One is the eventtimestamp - timestamp of the record creation from the source
2. Another is an transfer timestamp - timestamp from an intermediate process 
that is responsible to stream the data. 
The duplicates are introduced during the intermediate stage so for a given a 
record duplicate, the eventtimestamp is same but transfer timestamp is 
different. 

For the watermark, I like to use the transfertimestamp because I know the 
duplicates cant occur more than 3 minutes apart in transfer. But I cant use it 
within dropDuplicate because it wont capture the duplicates as the duplicates 
have different transfer timestamp. 

Here is an example,
        Event 1:{ "EventString":"example1", "Eventtimestamp": 
"2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
        Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": 
"2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}

In this case, the duplicate was created during transfer after 3 mins from the 
original event

My code is like below,
    
streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring","transferTimestamp");

The above code won't drop the duplicates as transferTimestamp is unique for the 
event and its duplicate. But currently, this is the only way as Spark forces me 
to include the watermark column in the dropDuplicates function. 

I would really like to see an dropDuplicate implementation like below which 
would be a valid case for any at-least once semantics streams where I dont have 
to use the watermark field in dropDuplicates and still the watermark based 
state eviction is honored. 
streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring");

If anyone has an alternate solution for this, please let me know. I cant use 
the eventtimestamp as it is not ordered and time range varies drastically 
(delayed events and junk events). 

Thanks in advance
-Nirmal

Reply via email to