[DISCUSS] split source of kafka partition by count

2023-03-30 Thread 孔维
Hi team, for the kafka source, when pulling data from kafka, the default 
parallelism is the number of kafka partitions.
There are cases:

Pulling large amount of data from kafka (eg. maxEvents=1), but the # of 
kafka partition is not enough, the procedure of the pulling will cost too much 
of time, even worse cause the executor OOM
There is huge data skew between kafka partitions, the procedure of the pulling 
will be blocked by the slowest partition

to solve those cases, I want to add a parameter 
hoodie.deltastreamer.kafka.per.batch.maxEvents to control the maxEvents in one 
kafka batch, default Long.MAX_VALUE means not trun this feature on.
hoodie.deltastreamer.kafka.per.batch.maxEvents  this confiuration will take 
effect after the hoodie.deltastreamer.kafka.source.maxEvents config. 


Here is my POC of the imporvement:
max executor core is 128.
not turn the feature on (hoodie.deltastreamer.kafka.source.maxEvents=5000)


turn on the feature (hoodie.deltastreamer.kafka.per.batch.maxEvents=20)


after turn on the feature, the timing of Tagging reduce from 4.4 mins to 1.1 
mins, can be more faster if given more cores.

How do you think? can I file a jira issue for this?

Re: [DISCUSS] Hudi Reverse Streamer

2023-03-30 Thread Prashant Wason
Could be useful. Also, may be useful for backup / replication scenario
(keeping a copy of data in alternate/cloud DC).

HoodieDeltaStreamer already has the concept of "sources". This can be
implemented as a "sink" concept.

On Thu, Mar 30, 2023 at 8:12 PM Vinoth Chandar  wrote:

> Essentially.
>
> Old architecture :(operational database) ==> some tool ==> (data
> warehouse raw data) ==> SQL ETL ==> (data warehouse derived data)
>
> New architecture : (operational database) ==> Hudi delta Streamer ==> (Hudi
> raw data) ==> Spark/Flink Hudi ETL ==> (Hudi derived data) ==> Hudi Reverse
> Streamer ==> (Data Warehouse/Kafka/Operational Database)
>
> On Thu, Mar 30, 2023 at 8:09 PM Vinoth Chandar  wrote:
>
> > Hi all,
> >
> > Any interest in building a reverse streaming tool, that does the reverse
> > of what the DeltaStreamer tool does? It will read Hudi table
> incrementally
> > (only source) and write out the data to a variety of sinks - Kafka, JDBC
> > Databases, DFS.
> >
> > This has come up many times with data warehouse users. Often times, they
> > want to use Hudi to speed up or reduce costs on their data ingestion and
> > ETL (using Spark/Flink), but want to move the derived data back into a
> data
> > warehouse or an operational database for serving.
> >
> > What do you all think?
> >
> > Thanks
> > Vinoth
> >
>


Re: Re: Re: DISCUSS

2023-03-30 Thread Vinoth Chandar
I think we can focus more on validating the hash index + bloom filter vs
consistent hash index more first. Have you looked at RFC-08, which is a
kind of hash index as well, except it stores the key => file group mapping
externally.

On Fri, Mar 24, 2023 at 2:14 AM 吕虎  wrote:

> Hi Vinoth, I am very happy to receive your reply. Here are some of my
> thoughts。
>
> At 2023-03-21 23:32:44, "Vinoth Chandar"  wrote:
> >>but when it is used for data expansion, it still involves the need to
> >redistribute the data records of some data files, thus affecting the
> >performance.
> >but expansion of the consistent hash index is an optional operation right?
>
> >Sorry, not still fully understanding the differences here,
> I'm sorry I didn't make myself clearly. The expansion I mentioned last
> time refers to data records increase in hudi table.
> The difference between consistent hash index and hash partition with Bloom
> filters index is how to deal with  data increase:
> For consistent hash index, the way of splitting the file is used.
> Splitting files affects performance, but can permanently work effectively.
> So consistent hash index is  suitable for scenarios where data increase
> cannot be estimated or  data will increase large.
> For hash partitions with Bloom filters index, the way of creating  new
> files is used. Adding new files does not affect performance, but if there
> are too many files, the probability of false positives in the Bloom filters
> will increase. So hash partitions with Bloom filters index is  suitable for
> scenario where data increase can be estimated over a relatively small range.
>
>
> >>Because the hash partition field values under the parquet file in a
> >columnar storage format are all equal, the added column field hardly
> >occupies storage space after compression.
> >Any new meta field added adds other overhead in terms evolving the schema,
> >so forth. are you suggesting this is not possible to do without a new meta
> >field?
>
> No new meta field  implementation is a more elegant implementation, but
> for me, who is not yet familiar with the Hudi source code, it is somewhat
> difficult to implement, but it is not a problem for experts. If you want to
> implement it without adding new meta fields, I hope I can participate in
> some simple development, and I can also learn how experts can do it.
>
>
> >On Thu, Mar 16, 2023 at 2:22 AM 吕虎  wrote:
> >
> >> Hello,
> >>  I feel very honored that you are interested in my views.
> >>
> >>  Here are some of my thoughts marked with blue font.
> >>
> >> At 2023-03-16 13:18:08, "Vinoth Chandar"  wrote:
> >>
> >> >Thanks for the proposal! Some first set of questions here.
> >> >
> >> >>You need to pre-select the number of buckets and use the hash
> function to
> >> >determine which bucket a record belongs to.
> >> >>when building the table according to the estimated amount of data,
> and it
> >> >cannot be changed after building the table
> >> >>When the amount of data in a hash partition is too large, the data in
> >> that
> >> >partition will be split into multiple files in the way of Bloom index.
> >> >
> >> >All these issues are related to bucket sizing could be alleviated by
> the
> >> >consistent hashing index in 0.13? Have you checked it out? Love to hear
> >> >your thoughts on this.
> >>
> >> Hash partitioning is applicable to data tables that cannot give the
> exact
> >> capacity of data, but can estimate a rough range. For example, if a
> company
> >> currently has 300 million customers in the United States, the company
> will
> >> have 7 billion customers in the world at most. In this scenario, using
> hash
> >> partitioning to cope with data growth within the known range by directly
> >> adding files and establishing  bloom filters can still guarantee
> >> performance.
> >> The consistent hash bucket index is also very valuable, but when it is
> >> used for data expansion, it still involves the need to redistribute the
> >> data records of some data files, thus affecting the performance. When
> it is
> >> completely impossible to estimate the range of data capacity, it is very
> >> suitable to use consistent hashing.
> >> >> you can directly search the data under the partition, which greatly
> >> >reduces the scope of the Bloom filter to search for files and reduces
> the
> >> >false positive of the Bloom filter.
> >> >the bloom index is already partition aware and unless you use the
> global
> >> >version can achieve this. Am I missing something?
> >> >
> >> >Another key thing is - if we can avoid adding a new meta field, that
> would
> >> >be great. Is it possible to implement this similar to bucket index,
> based
> >> >on jsut table properties?
> >> Add a hash partition field in the table to implement the hash partition
> >> function, which can well reuse the existing partition function, and
> >> involves very few code changes. Because the hash partition field values
> >> under the parquet file in a columnar storage 

Re: When using the HoodieDeltaStreamer, is there a corresponding parameter that can control the number of cycles? For example, if I cycle 5 times, I stop accessing data

2023-03-30 Thread Vinoth Chandar
I believe there is no control today. You could hack a precommit validator
and call System.exit if you want ;) (ugly, I know)

But maybe we could introduce some abstraction to do a check between loops?
or allow users to plugin some logic to decide whether to continue or exit?

Love to understand the use-case more here.

On Wed, Mar 29, 2023 at 7:32 AM lee  wrote:

> When I use the HoodieDeltaStreamer, the "-- continuous" parameter: "Delta
> Streamer runs in continuous mode running source match ->Transform ->Hudi
> Write in loop". So I would like to ask if there are any corresponding
> parameters that can control the number of cycles, such as stopping
> accessing data when I cycle 5 times.
>
>
>
> 李杰
> leedd1...@163.com
>
> 
>


Re: [DISCUSS] Hudi Reverse Streamer

2023-03-30 Thread Vinoth Chandar
Essentially.

Old architecture :(operational database) ==> some tool ==> (data
warehouse raw data) ==> SQL ETL ==> (data warehouse derived data)

New architecture : (operational database) ==> Hudi delta Streamer ==> (Hudi
raw data) ==> Spark/Flink Hudi ETL ==> (Hudi derived data) ==> Hudi Reverse
Streamer ==> (Data Warehouse/Kafka/Operational Database)

On Thu, Mar 30, 2023 at 8:09 PM Vinoth Chandar  wrote:

> Hi all,
>
> Any interest in building a reverse streaming tool, that does the reverse
> of what the DeltaStreamer tool does? It will read Hudi table incrementally
> (only source) and write out the data to a variety of sinks - Kafka, JDBC
> Databases, DFS.
>
> This has come up many times with data warehouse users. Often times, they
> want to use Hudi to speed up or reduce costs on their data ingestion and
> ETL (using Spark/Flink), but want to move the derived data back into a data
> warehouse or an operational database for serving.
>
> What do you all think?
>
> Thanks
> Vinoth
>


[DISCUSS] Hudi Reverse Streamer

2023-03-30 Thread Vinoth Chandar
Hi all,

Any interest in building a reverse streaming tool, that does the reverse of
what the DeltaStreamer tool does? It will read Hudi table incrementally
(only source) and write out the data to a variety of sinks - Kafka, JDBC
Databases, DFS.

This has come up many times with data warehouse users. Often times, they
want to use Hudi to speed up or reduce costs on their data ingestion and
ETL (using Spark/Flink), but want to move the derived data back into a data
warehouse or an operational database for serving.

What do you all think?

Thanks
Vinoth