We are actively looking at how to support parallel writing. But, as you can 
imagine, when it comes to updates and avoiding duplicates for insert, only one 
writer needs to be running. One of the design choices we have made so far was 
to avoid any external dependency in running hudi to avoid any operational 
burden for users.  Having said that, there are users who have used coordination 
service like zookeeper to guarantee mutex on top of hudi jobs (typically run 
using Airflow like coordination service). You can model your pipelines to 
achieve this. But, just to reiterate, we are looking more closely at how to 
best support concurrent ingestion.
Please consider this when you are designing your pipelines.
Balaji.V    On Tuesday, July 21, 2020, 11:06:49 PM PDT, Lian Jiang 
<jiangok2...@gmail.com> wrote:  
 
 Thanks Balaji.
Appreciate the answer and the jira creation. Below is the improved design after 
some investigation. 




The differences between it and my previous diagram are:1. one delta streamer 
produces one hudi dataset (as opposed to one delta streamer produces multiple 
hudi dataset). Delta streamer's --target-table option indicates that one delta 
streamer job produces one hudi dataset. Also --source-class option indicates 
that one delta streamer job can only have one source. So one delta streamer 
cannot support streaming and backfill at the same time.
2. each delta streamer will have its own event extractor plugin to extract the 
desired type of events.3. all delta streamers will sync hudi data sets to Hive 
so that the users can query via hive without worrying whether the underlying 
format is parquet or hudi.4. Each hudi dataset's backfill is handled by a 
separate backfill job, assuming the backfill job and delta streamer can work 
correctly when writing into the same dataset concurrently.

Hope this design makes more sense than my previous one. I will inform you of 
any issues in development.



Regarding your feedback,
" faithfully append event stream logs to S3 before you materialize in different 
order, you can try the "insert" mode in hudi, which would give you small file 
size handling."
I may need both "upsert" and "insert" for different hudi datasets. I will 
definitely prefer "insert" mode for appending only user cases.
"With 0.6, we are planning to allow multiple writers as long as there is 
guarantee that writers will be writing to different partitions. I think this 
will fit your requirement and also keep one timeline."
This is interesting. I want to expand my use cases a little since I am 
wondering how I can guarantee writers writing to different partitions.Case 1: 
(mentioned above) the streaming delta streamer and the backfill job writes into 
the same hudi dataset. I control both jobs.Case 2: the delta streamer keeps 
ingestion and a CCPA/GDPR job deletes some customer data from the same hudi 
dataset from time to time. The CCPA job could be from another infra team.
In case 1, how do I control my jobs to guarantee delta streamer and backfill 
job writing different partitions, especially there could be late arrival events 
that could be written into a random early partition.In case 2, it will be hard 
for different teams' jobs to coordinate with each other to avoid partition 
conflict.
As you can see, it may not be easy for applications to provide such guarantee. 
Is it possible that the hudi writers can coordinate themselves by using some 
locking mechanism? IMHO, it is ok to sacrifice some performance to make the 
concurrent writing correct.
Appreciate your insight.

RegardsLian






On Tue, Jul 21, 2020 at 2:13 AM Balaji Varadarajan <v.bal...@ymail.com.invalid> 
wrote:

 Please see answers inline...

    On Sunday, July 19, 2020, 10:08:09 PM PDT, Lian Jiang 
<jiangok2...@gmail.com> wrote:  

 Hi,
I have a kafka topic using a kafka s3 connector to dump data into s3 hourly in 
parquet format. These parquet files are partitioned in ingestion time and each 
record has fields which are deeply nested jsons. Each record is a monolithic 
data containing multiple events each has its own event time. This causes two 
issues: 1. slow query by event time; 2. hard to use due to many levels of 
exploding. I plan to use the below design to solve these problems. 

In this design, I still use the s3 parquet dumped by the Kafka S3 connector as 
a backfill for the hudi pipeline. This is because the S3 connector pipeline is 
easier then the hudi pipeline to set up and will work before the hudi pipeline 
is working. Also, the s3 connector pipeline may be more reliable than the hudi 
pipeline due to the potential bugs in delta streamer.The delta streamer will 
decompose the monolithic kafka record into multiple event streams. Each event 
stream is written into one hudi dataset partition and sorted by its 
corresponding event time. Such hudi datasets are synced with hive which is 
exposed for user query so that they don't need to care whether the underlying 
table format is parquet or hudi.Hopefully, such design improves the query 
performance due to the fact that the data set is partitioned and sorted by 
event times as opposed to kafka ingest time. Also user experience is improved 
by querying the extracted events.

Let us know if you there are any issues with deltastreamer for it to be used in 
the first stage. If you want to faithfully append event stream logs to S3 
before you materialize in different order, you can try the "insert" mode in 
hudi, which would give you small file size handling. 

Questions:1. Do you see any issue for the delta streamer to handle both 
streaming and backfill at the same time? I know hudi dataset cannot be written 
by multiple writing clients simultaneously. Also, I don't want the delta 
streamer to stop handling the streaming data while doing backfill. The delta 
streamer will use dynamic allocation. Assuming the cluster has enough capacity, 
the load caused by backfill should not be an issue.

With 0.6, we are planning to allow multiple writers as long as there is 
guarantee that writers will be writing to different partitions. I think this 
will fit your requirement and also keep one timeline. 

2. If I want to time travel to a previous day (e.g. the first day 11:00:00AM 
PST of the last Month), how can I make hudi 1 and hudi 2 (... hudi n) in sync. 
AFAIK, hudi time travel is done by commit instead of timestamp. Should I do 
below: a. listing the commits of these hudi datasets, 
 b. finding the commits closing to each other and being closest to the desired 
timestamp, 
 c. apply time travel for each hudi dataset.Is there an easier and more 
accurate way? Will hudi support time travel by timestamp in the future as delta 
lake does? 


Commit time is like a timestamp although in specific format (secs). It should 
be straightforward to reformat a timestamp to commit time and then use it in 
the WHERE clause. But, I have opened a ticket 
https://issues.apache.org/jira/browse/HUDI-1116 to track this request. My 
initial thinking is this should not be hard to support. 

Balaji.V  


-- 

|  
|  
|    
|  
 
  |

 
|

  |

  |

 
|  |

 |

   Create your own email signature    

Reply via email to