[ 
https://issues.apache.org/jira/browse/HUDI-8692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17914207#comment-17914207
 ] 

Davis Zhang edited comment on HUDI-8692 at 1/18/25 8:05 PM:
------------------------------------------------------------

h2. *There is a design gap for multi-stream writer*

As of now if I directly run the test it throw exception

 
{code:java}
java.lang.NumberFormatException: For input string: "topic1,0:10,1:10"
        at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)   
     at java.lang.Long.parseLong(Long.java:589)      at 
java.lang.Long.parseLong(Long.java:631)      at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.lambda$getNextFilePathsAndMaxModificationTime$0(DFSPathSelector.java:130)
  at org.apache.hudi.common.util.Option.map(Option.java:112)      at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:130)
   at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:113)
   at 
org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:52)
  at 
org.apache.hudi.utilities.sources.RowSource.readFromCheckpoint(RowSource.java:61)
    at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:139)  at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:184)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:710)
  at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
  at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
    at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464)  at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:903)
      at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:223) 
     at 
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.testDeltaStreamerMultiwriterCheckpoint(TestHoodieDeltaStreamer.java:2048){code}
The test tries to do:
 * parquet DS write to hudi table

 
{code:java}
  "extraMetadata": {
    "map": {
      "schema": 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
      "streamer.checkpoint.key.v2": "1737136458028"
    }
  },{code}
 
 * Json source DS writes to the same table

 
{code:java}
  "extraMetadata": {
    "map": {
      "schema": 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
      "streamer.checkpoint.key.v2": "topic1,0:10,1:10"
    }
  }{code}
 
 * parquet DS write to hudi table again <--- error out here.

 

It is apparent that there is a design gap of multi-writer - Previously we use a 
singleton attribute for tracking the checkpoint as there should be only 1 data 
source all the time. There is only 1 cursor over 1 queue.

 

Now as we write multiple from data sources, various writers will trash each 
other as the overload the "streamer.checkpoint.key.v2" for tracking their own 
state. It is lucky we error out here as different data source have different 
checkpoint format, preventing from actually writing anything downstream. If 
they are of the same data source type, it simply means data corruption as from 
the perspective of 1 data source, it's offset is arbitrarily reset to some 
random place, leading to either invalid offset or offset pointing to data 
already processed.

 
h2. Proposed AI

This is non-trivial. We should track it as a Jira issue and plan for design, 
dev and test properly in future sprints.
h2. *Design for closing the gap*

To fix this issue, we need a multi-dimension way of tracking checkpoints. 
Instead of just tracking offset, we need (data source id, offset). 2 questions:

*[Id registry for data sources]*

How to track id for a data source. When do we identify a stream as an existing 
stream and when do we decide to this is a new stream and id should be assigned 
accordingly. Do we allow removal of a stream. Especially needs to watch out how 
this interacts with today's functionality like clean and restart.

 

*[physical form of storing (data source id, offset) info in instant]*

What's the format inside the instant file that we should keep this piece of 
information.

 
h3. *Id registry for data sources*

When user created a delta streamer, 2 things are created:
 * A hudi table holding data from the data source.
 * A data pipe that piping data from source to the hudi table.

 

When we do clean and restarts, both are reset.

 

Now with multi-streamer we are actually expanding the "data pipe" to be 
multiple ones. It implies the functionality of:
 * Adding a data pipe with unique id
 * Removing a data pipe given an id
 * A registry tracking (id, offset)

So it requires user facing API for the first 2.
{code:java}
// It should be almost the same as today's API of creating delta streamer, just 
reusing some existing hudi table
AddDataSource(hudi_table_identifier, data_pipe_id, various_data_pipe_configs)

// Once removed, no more writes happens to the data pipe. Ongoing writes are 
interrupted at a best effort.
RemoveDataSource(hudi_table_identifier, data_pipe_id){code}
Of course, the hudi streamer code needs to accommodate the `data_pipe_id` 
whenever it tries to access some data pipe specific state - checkpoint is one 
example. More can come as we investigate the code.

 

For the third one it will be discussed in a separate section.

 

{*}physical form of storing (data source id, offset) info in instant{*}{*}{{*}}

Propose: streamer.checkpoint.key.v2.$streamId -> offset

It requires the streamId to conform to [a-z][a-z0-9_]*

 

 

*How to make sure the latest instant contains the latest checkpoint for all 
streamers*

If every streamer just read the last instant file for checkpoint and at commit 
time write a new instant file which only contains their own updated offset, we 
might lose the information of concurrent offset update from other streamers in 
that latest written instant file, which only comes from just 1 of the streamers.

 

 

Several options:

*1. Serialized offset registry update at validation phase*

Regardless of OCC or NBCC, the validation phase is guaranteed to be serially 
executed. At that time, we can read the latest instant and combine with the 
updated offset.Because of serial execution, every instant file we write is 
guaranteed to contain the latest checkpoint offset for all streamers.

 

We applied the same technic in my concurrent schema evolution conflict 
detection RFC 82 - we need to write the latest table schema in every 
commit/delta commit/replacement commit metadata, table schema is a shared state 
subject to concurrent change of multi-writers just like the checkpoint offsets 
registry.

 

What we do is at validation phase read the latest table schema and then combine 
with the current writer schema to generate the updated latest schema. 

 

Of course, we need all writers follow such validation commit metadata 
validation protocol. Otherwise we need to filter out instants written by none 
compliant writers.

 

Pros and cons?

 

2. For each streamer they only update their own checkpoint at needed without 
combining with the latest state.

 

It means in order to find the last checkpoint, we need to track instant that 
contains the right streamer id.

But need to think of archive case / we need to read instant files from long 
time ago.

Also we would need to read 2 instant files - one for checkpoint tracking and 
the other for whatever the latest info we need from the last completed 
checkpoint.

 

 

*Some discussion*
hey man.
[11:54|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230079861479]
Sivabalan
it may not be apples to apples in my opinion.
[11:54|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230086638559]
Sivabalan
we can def try that route.
[11:55|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230111518419]
Sivabalan
in case of schema, there is only one schema for the entire table. irrespective 
of diff sources we read from.
[11:55|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230146116409]
Sivabalan
where as, wrt checkpoint, each source has a diff checkpoint. So, its not 
strictly required to maintain every diff source's checkpoint in every commit 
metadata.
[11:56|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230189842579]
Sivabalan
but wrt schema, we ought to, since the next writer (irrespective of which 
source it reads from), had to know the latest schema even if previous commit 
was from a diff writer.
[11:57|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230270443999]
Sivabalan
but I see your point that, anyways to do conflict resolution, we take lock, 
reload the entire timeline and do some validation. So, there is no additional 
overhead in computing a checkpoint map and updating the new commit metadata.
!https://ca.slack-edge.com/T01QS1D2SET-U072WD6ETK2-10e6a9335419-48!
Davis ZhangDavis Zhang  [11:57 
AM|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230277087959]

so even for table schema, not all writers write schema field - delta commit 
that only update checkpoints does not even come with any schema. In my 
implementation those instants are filtered out. Similar instants comes from 
table services are also ignored.
[11:58|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230292406409]
Davis Zhang
it requires filtering of instants.
!https://ca.slack-edge.com/T01QS1D2SET-U01RCA55350-354289012610-48!
SivabalanSivabalan  [11:58 
AM|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230316204929]

only case we might wanna consider is. 2 sources writing serially writing to 
same hudi table. So, no multi-writers involved.
[11:58|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230330147729]
Sivabalan
I mean, no concurrent writers and users have confiugred just single writer.
[11:59|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230350562219]
Sivabalan
but from an orchestration standpoint, they plan to execute 1 source followed by 
2nd source.
[11:59|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230384228199]
Sivabalan
 
every 1 hour consume data from source1 and ingest consume data from source2 and 
ingest sleep until 1hour elapses. go to next round.
!https://ca.slack-edge.com/T01QS1D2SET-U072WD6ETK2-10e6a9335419-48!
Davis ZhangDavis Zhang  [11:59 
AM|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230388345269]

I know this multi-streamer stuff is not our target and priority. But in case 
you want to discuss more we can huddle (edited) 
!https://ca.slack-edge.com/T01QS1D2SET-U01RCA55350-354289012610-48!
SivabalanSivabalan  [12:00 
PM|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230408357859]

so, in this set up, user may not have configured multi-writers at all.
[12:00|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230451570089]
Sivabalan
I am busy w/ something man.
[12:01|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230465780479]
Sivabalan
its not urgent. we can discuss this early next week.
!https://ca.slack-edge.com/T01QS1D2SET-U072WD6ETK2-10e6a9335419-48!
Davis ZhangDavis Zhang  [12:01 
PM|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230470254129]

yeah, I guess so. No worries at all!
[12:01|https://onehouseinc.slack.com/archives/D072TEUM9NV/p1737230497236679]
Davis Zhang
Thanks for sharing your feedbacks. I will just put these information to archive 
for future reference.
!https://slack-imgs.com/?c=1&o1=gu&url=https%3A%2F%2Femoji.slack-edge.com%2FT01QS1D2SET%2Fack%2F4b55fdd8a9127c0f.png!!https://a.slack-edge.com/production-standard-emoji-assets/14.0/apple-small/[email protected]!!https://a.slack-edge.com/production-standard-emoji-assets/14.0/apple-small/[email protected]!
 
 
 
 

 

 

 


was (Author: JIRAUSER305408):
h2. *There is a design gap for multi-stream writer*

As of now if I directly run the test it throw exception

 
{code:java}
java.lang.NumberFormatException: For input string: "topic1,0:10,1:10"
        at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)   
     at java.lang.Long.parseLong(Long.java:589)      at 
java.lang.Long.parseLong(Long.java:631)      at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.lambda$getNextFilePathsAndMaxModificationTime$0(DFSPathSelector.java:130)
  at org.apache.hudi.common.util.Option.map(Option.java:112)      at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:130)
   at 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:113)
   at 
org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:52)
  at 
org.apache.hudi.utilities.sources.RowSource.readFromCheckpoint(RowSource.java:61)
    at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:139)  at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:184)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:710)
  at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
  at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
    at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464)  at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:903)
      at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:223) 
     at 
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.testDeltaStreamerMultiwriterCheckpoint(TestHoodieDeltaStreamer.java:2048){code}
The test tries to do:
 * parquet DS write to hudi table

 
{code:java}
  "extraMetadata": {
    "map": {
      "schema": 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
      "streamer.checkpoint.key.v2": "1737136458028"
    }
  },{code}
 
 * Json source DS writes to the same table

 
{code:java}
  "extraMetadata": {
    "map": {
      "schema": 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
      "streamer.checkpoint.key.v2": "topic1,0:10,1:10"
    }
  }{code}
 
 * parquet DS write to hudi table again <--- error out here.

 

It is apparent that there is a design gap of multi-writer - Previously we use a 
singleton attribute for tracking the checkpoint as there should be only 1 data 
source all the time. There is only 1 cursor over 1 queue.

 

Now as we write multiple from data sources, various writers will trash each 
other as the overload the "streamer.checkpoint.key.v2" for tracking their own 
state. It is lucky we error out here as different data source have different 
checkpoint format, preventing from actually writing anything downstream. If 
they are of the same data source type, it simply means data corruption as from 
the perspective of 1 data source, it's offset is arbitrarily reset to some 
random place, leading to either invalid offset or offset pointing to data 
already processed.

 
h2. Proposed AI

This is non-trivial. We should track it as a Jira issue and plan for design, 
dev and test properly in future sprints.
h2. *Design for closing the gap*

To fix this issue, we need a multi-dimension way of tracking checkpoints. 
Instead of just tracking offset, we need (data source id, offset). 2 questions:

*[Id registry for data sources]*

How to track id for a data source. When do we identify a stream as an existing 
stream and when do we decide to this is a new stream and id should be assigned 
accordingly. Do we allow removal of a stream. Especially needs to watch out how 
this interacts with today's functionality like clean and restart.

 

*[physical form of storing (data source id, offset) info in instant]*

What's the format inside the instant file that we should keep this piece of 
information.

 
h3. *Id registry for data sources*

When user created a delta streamer, 2 things are created:
 * A hudi table holding data from the data source.
 * A data pipe that piping data from source to the hudi table.

 

When we do clean and restarts, both are reset.

 

Now with multi-streamer we are actually expanding the "data pipe" to be 
multiple ones. It implies the functionality of:
 * Adding a data pipe with unique id
 * Removing a data pipe given an id
 * A registry tracking (id, offset)

So it requires user facing API for the first 2.
{code:java}
// It should be almost the same as today's API of creating delta streamer, just 
reusing some existing hudi table
AddDataSource(hudi_table_identifier, data_pipe_id, various_data_pipe_configs)

// Once removed, no more writes happens to the data pipe. Ongoing writes are 
interrupted at a best effort.
RemoveDataSource(hudi_table_identifier, data_pipe_id){code}
Of course, the hudi streamer code needs to accommodate the `data_pipe_id` 
whenever it tries to access some data pipe specific state - checkpoint is one 
example. More can come as we investigate the code.

 

For the third one it will be discussed in a separate section.

 

{*}physical form of storing (data source id, offset) info in instant{*}{*}{{*}}

Propose: streamer.checkpoint.key.v2.$streamId -> offset

It requires the streamId to conform to [a-z][a-z0-9_]*

 

> Re-enable TestHoodieDeltaStreamer#testDeltaStreamerMultiwriterCheckpoint
> ------------------------------------------------------------------------
>
>                 Key: HUDI-8692
>                 URL: https://issues.apache.org/jira/browse/HUDI-8692
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: tests-ci
>            Reporter: Y Ethan Guo
>            Assignee: Davis Zhang
>            Priority: Blocker
>             Fix For: 1.0.1
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Related: HUDI-6609



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to