[
https://issues.apache.org/jira/browse/HUDI-8692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17914207#comment-17914207
]
Davis Zhang edited comment on HUDI-8692 at 1/17/25 6:22 PM:
------------------------------------------------------------
h2. *There is a design gap for multi-stream writer*
As of now if I directly run the test it throw exception
{code:java}
java.lang.NumberFormatException: For input string: "topic1,0:10,1:10"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589) at
java.lang.Long.parseLong(Long.java:631) at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.lambda$getNextFilePathsAndMaxModificationTime$0(DFSPathSelector.java:130)
at org.apache.hudi.common.util.Option.map(Option.java:112) at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:130)
at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:113)
at
org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:52)
at
org.apache.hudi.utilities.sources.RowSource.readFromCheckpoint(RowSource.java:61)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:139) at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:184)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:710)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464) at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:903)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101) at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:223)
at
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.testDeltaStreamerMultiwriterCheckpoint(TestHoodieDeltaStreamer.java:2048){code}
The test tries to do:
* parquet DS write to hudi table
{code:java}
"extraMetadata": {
"map": {
"schema":
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
"streamer.checkpoint.key.v2": "1737136458028"
}
},{code}
* Json source DS writes to the same table
{code:java}
"extraMetadata": {
"map": {
"schema":
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
"streamer.checkpoint.key.v2": "topic1,0:10,1:10"
}
}{code}
* parquet DS write to hudi table again <--- error out here.
It is apparent that there is a design gap of multi-writer - Previously we use a
singleton attribute for tracking the checkpoint as there should be only 1 data
source all the time. There is only 1 cursor over 1 queue.
Now as we write multiple from data sources, various writers will trash each
other as the overload the "streamer.checkpoint.key.v2" for tracking their own
state. It is lucky we error out here as different data source have different
checkpoint format, preventing from actually writing anything downstream. If
they are of the same data source type, it simply means data corruption as from
the perspective of 1 data source, it's offset is arbitrarily reset to some
random place, leading to either invalid offset or offset pointing to data
already processed.
h2. Proposed AI
This is non-trivial. We should track it as a Jira issue and plan for design,
dev and test properly in future sprints.
h2. *Design for closing the gap*
To fix this issue, we need a multi-dimension way of tracking checkpoints.
Instead of just tracking offset, we need (data source id, offset). 2 questions:
*[Id registry for data sources]*
How to track id for a data source. When do we identify a stream as an existing
stream and when do we decide to this is a new stream and id should be assigned
accordingly. Do we allow removal of a stream. Especially needs to watch out how
this interacts with today's functionality like clean and restart.
*[physical form of storing (data source id, offset) info in instant]*
What's the format inside the instant file that we should keep this piece of
information.
h3. *Id registry for data sources*
When user created a delta streamer, 2 things are created:
* A hudi table holding data from the data source.
* A data pipe that piping data from source to the hudi table.
When we do clean and restarts, both are reset.
Now with multi-streamer we are actually expanding the "data pipe" to be
multiple ones. It implies the functionality of:
* Adding a data pipe with unique id
* Removing a data pipe given an id
* A registry tracking (id, offset)
So it requires user facing API for the first 2.
{code:java}
// It should be almost the same as today's API of creating delta streamer, just
reusing some existing hudi table
AddDataSource(hudi_table_identifier, data_pipe_id, various_data_pipe_configs)
// Once removed, no more writes happens to the data pipe. Ongoing writes are
interrupted at a best effort.
RemoveDataSource(hudi_table_identifier, data_pipe_id){code}
Of course, the hudi streamer code needs to accommodate the `data_pipe_id`
whenever it tries to access some data pipe specific state - checkpoint is one
example. More can come as we investigate the code.
For the third one it will be discussed in a separate section.
{*}physical form of storing (data source id, offset) info in instant{*}{*}{{*}}
Propose: streamer.checkpoint.key.v2.$streamId -> offset
It requires the streamId to conform to [a-z][a-z0-9_]*
was (Author: JIRAUSER305408):
h2. *There is a design gap for multi-stream writer*
As of now if I directly run the test it throw exception
{code:java}
java.lang.NumberFormatException: For input string: "topic1,0:10,1:10"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589) at
java.lang.Long.parseLong(Long.java:631) at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.lambda$getNextFilePathsAndMaxModificationTime$0(DFSPathSelector.java:130)
at org.apache.hudi.common.util.Option.map(Option.java:112) at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:130)
at
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.getNextFilePathsAndMaxModificationTime(DFSPathSelector.java:113)
at
org.apache.hudi.utilities.sources.ParquetDFSSource.fetchNextBatch(ParquetDFSSource.java:52)
at
org.apache.hudi.utilities.sources.RowSource.readFromCheckpoint(RowSource.java:61)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:139) at
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:184)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:710)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464) at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:903)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:101) at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:223)
at
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.testDeltaStreamerMultiwriterCheckpoint(TestHoodieDeltaStreamer.java:2048){code}
The test tries to do:
* parquet DS write to hudi table
{code:java}
"extraMetadata": {
"map": {
"schema":
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
"streamer.checkpoint.key.v2": "1737136458028"
}
},{code}
* Json source DS writes to the same table
{code:java}
"extraMetadata": {
"map": {
"schema":
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"}]}",
"streamer.checkpoint.key.v2": "topic1,0:10,1:10"
}
}{code}
* parquet DS write to hudi table again <--- error out here.
It is apparent that there is a design gap of multi-writer - Previously we use a
singleton attribute for tracking the checkpoint as there should be only 1 data
source all the time. There is only 1 cursor over 1 queue.
Now as we write multiple from data sources, various writers will trash each
other as the overload the "streamer.checkpoint.key.v2" for tracking their own
state. It is lucky we error out here as different data source have different
checkpoint format, preventing from actually writing anything downstream. If
they are of the same data source type, it simply means data corruption as from
the perspective of 1 data source, it's offset is arbitrarily reset to some
random place, leading to either invalid offset or offset pointing to data
already processed.
h2. *Design for closing the gap*
To fix this issue, we need a multi-dimension way of tracking checkpoints.
Instead of just tracking offset, we need (data source id, offset). 2 questions:
*[Id registry for data sources]*
How to track id for a data source. When do we identify a stream as an existing
stream and when do we decide to this is a new stream and id should be assigned
accordingly. Do we allow removal of a stream. Especially needs to watch out how
this interacts with today's functionality like clean and restart.
*[physical form of storing (data source id, offset) info in instant]*
What's the format inside the instant file that we should keep this piece of
information.
h3. *Id registry for data sources*
When user created a delta streamer, 2 things are created:
* A hudi table holding data from the data source.
* A data pipe that piping data from source to the hudi table.
When we do clean and restarts, both are reset.
Now with multi-streamer we are actually expanding the "data pipe" to be
multiple ones. It implies the functionality of:
* Adding a data pipe with unique id
* Removing a data pipe given an id
* A registry tracking (id, offset)
So it requires user facing API for the first 2.
{code:java}
// It should be almost the same as today's API of creating delta streamer, just
reusing some existing hudi table
AddDataSource(hudi_table_identifier, data_pipe_id, various_data_pipe_configs)
// Once removed, no more writes happens to the data pipe. Ongoing writes are
interrupted at a best effort.
RemoveDataSource(hudi_table_identifier, data_pipe_id){code}
Of course, the hudi streamer code needs to accommodate the `data_pipe_id`
whenever it tries to access some data pipe specific state - checkpoint is one
example. More can come as we investigate the code.
For the third one it will be discussed in a separate section.
{*}physical form of storing (data source id, offset) info in instant{*}{*}{*}
Propose: streamer.checkpoint.key.v2.$streamId -> offset
It requires the streamId to conform to [a-z][a-z0-9_]*
> Re-enable TestHoodieDeltaStreamer#testDeltaStreamerMultiwriterCheckpoint
> ------------------------------------------------------------------------
>
> Key: HUDI-8692
> URL: https://issues.apache.org/jira/browse/HUDI-8692
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: tests-ci
> Reporter: Y Ethan Guo
> Assignee: Davis Zhang
> Priority: Blocker
> Fix For: 1.0.1
>
>
> Related: HUDI-6609
--
This message was sent by Atlassian Jira
(v8.20.10#820010)