Re: Spark v2.3.2 : Duplicate entries found for each primary Key

2019-11-19 Thread Pratyaksh Sharma
Thank you for the explanation Kabeer/Sudha.

Let me go through the flow and revert back in case of any further queries.

On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed  wrote:

> Pratyaksh,
>
> +1 to what Sudha has written. Lets zoom a bit closer.
> For hive, as you said, we explicitly set input format to
> HoodieParquetInputFormat.
> - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> nothing but a input format for hive. Hive and Presto depend on this file to
> retrieve dataset from Hudi.
>
> For Spark, there is no such option to set this explicitly. Rather Spark
> starts reading the paths direct from the file system (HDFS or S3). From
> Spark the calls would be as below:
> - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
>
> Now it is evident that we cant stick the HoodieParquetInputFormat. Rather
> we rely on the PathFilter class that allows us to filter out the paths (and
> files). So we explicitly set this in the Spark Hadoop Configuration (note
> that Spark uses Hadoop FS S3 implementation to read from S3).
>
> If you look into the file: HoodieROTablePathFilter, you will see that
> there is logic to ensure that folders (paths) or files for Hoodie related
> files always ensures that latest path/file is selected. Thus you do not see
> duplicate entries when you set. Without this, Spark is just plainly reading
> all the parquet files and displaying the data within them.
>
> It may take sometime from you to go through these paths and digest the
> flow. But should you still have any questions, please do not hesitate to
> revert back.
>
> Hope this helps
> Kabeer.
>
>
>
> Sent: Monday, November 18, 2019 at 7:47 PM
> From: "Bhavani Sudha" 
> To: dev@hudi.apache.org
> Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
> Hi Pratyaksh,
>
> Let me try to answer this. I believe spark does not natively invoke
> HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> queried, spark just loads all the data files in that partition without
> applying Hoodie filtering logic. Thats why we need to instruct Spark to
> read in the appropriate format in one of the two ways suggested by
> Vinoth/Kabeer earlier.
>
> Thanks,
> Sudha
>
> On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma 
> wrote:
>
> > Hi Vinoth/Kabeer,
> >
> > I have one small doubt regarding what you proposed to fix the issue. Why
> is
> > HoodieParquetInputFormat class not able to handle deduplication of
> records
> > in case of spark while it is able to do so in case of presto and hive?
> >
> > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar 
> wrote:
> >
> > > Sweet!
> > >
> > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > > pushpavant...@gmail.com> wrote:
> > >
> > > > Thanks Vinoth and Kabeer. It resolved my problem.
> > > >
> > > > Regards,
> > > > Purushotham Pushpavanth
> > > >
> > > >
> > > >
> > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed 
> > wrote:
> > > >
> > > > > Adding to Vinoth's response, in spark-shell you just need to copy
> and
> > > > > paste the below line. Let us know if it still doesnt work.
> > > > >
> > > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar 
> > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > are you setting the path filters when you query the Hudi Hive
> table
> > > via
> > > > > > Spark
> > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > >
> http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > > alternatively)?
> > > > > >
> > > > > > - Vinoth
> > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > > pushpavant...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > Below is a create statement on my Hudi dataset.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > *CREATE EXTERNAL TABLE
> > `inventory`.`customer`(`_hoodie_commit_time`
> > > > > string,
> > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string,
> `id`
> > > > > bigint,
> > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > > `tem_shipment`
> > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > > 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > > INPUTFORMAT
> > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInp

Re: Small clarification in Hoodie Cleaner flow

2019-11-19 Thread Pratyaksh Sharma
Thank you for the clarification Balaji. Now I understand it properly. :)

On Tue, Nov 19, 2019 at 11:17 PM Balaji Varadarajan 
wrote:

> I updated the FAQ section to set defaults correctly and add more
> information related to this :
>
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-WhatdoestheHudicleanerdo
>
> The cleaner retention configuration is based on counts (number of commits
> to be retained) with the assumption that users need to provide a
> conservative number. The historical reason was that ingestion used to run
> in specific cadence (e.g every 30 mins) with the norm being an ingestion
> run taking less than 30 mins. With this model, it was simpler to represent
> the configuration as a count of commits to approximate the retention time.
>
> With delta-streamer continuous mode, ingestion is allowed to be scheduled
> immediately after the previous run is scheduled. I think it would make
> sense to introduce a time based retention. I have created a newbie ticket
> for this : https://jira.apache.org/jira/browse/HUDI-349
>
> Pratyaksh, In sum, if the defaults are too low, use a conservative number
> based on the number of ingestion runs you see in your setup. The defaults
> as referenced in the code-comments needs change (from 24 to 10).(
> https://jira.apache.org/jira/browse/HUDI-350)
>
> Thanks,
> Balaji.V
>
> On Tue, Nov 19, 2019 at 1:40 AM Pratyaksh Sharma 
> wrote:
>
> > Hi,
> >
> > We are assuming the following in getDeletePaths() method in cleaner flow
> in
> > case of KEEP_LATEST_COMMITS policy -
> >
> > /**
> > * Selects the versions for file for cleaning, such that it
> > * 
> > * - Leaves the latest version of the file untouched - For older
> versions, -
> > It leaves all the commits untouched which
> > * has occured in last config.getCleanerCommitsRetained()
> > commits - It leaves ONE commit before this
> > * window. We assume that the max(query execution time) ==
> commit_batch_time
> > * config.getCleanerCommitsRetained().
> > * This is 12 hours by default. This is essential to leave the file used
> by
> > the query thats running for the max time.
> > * 
> > * This provides the effect of having lookback into all changes that
> > happened in the last X commits. (eg: if you
> > * retain 24 commits, and commit batch time is 30 mins, then you have 12
> hrs
> > of lookback)
> > * 
> > * This policy is the default.
> > */
> >
> > I want to understand the term commit_batch_time in this assumption and
> the
> > assumption as a whole. As per my understanding, this term refers to the
> > time taken in one iteration of DeltaSync end to end (which is hardly 7-8
> > minutes in my case). If my understanding is correct, then this time will
> > vary depending on the size of incoming RDD. So in that case, the time
> > needed for the longest query is effectively a variable. So in that case
> > what is a safe option to keep for the config
> > config.getCleanerCommitsRetained().
> >
> > Basically I want to set the config
> > config.getCleanerCommitsRetained() properly for my Hudi
> > instance and hence I am trying to understand the assumption. Its default
> > value is 10, I want to understand if this can be reduced further without
> > any query failing.
> >
> > Please help me with this.
> >
> > Regards
> > Pratyaksh
> >
>


Re: 20191119 Weekly Meeting

2019-11-19 Thread nishith agarwal
Thanks Bhavani!

-Nishith

On Tue, Nov 19, 2019 at 10:26 PM Bhavani Sudha 
wrote:

> Please find the meeting summary here -
> https://cwiki.apache.org/confluence/x/OxYZC
>
> Thanks,
> Sudha
>
> On Tue, Nov 19, 2019 at 9:06 PM Vinoth Chandar  wrote:
>
> > Hangout link here
> > 
> >
>


Re: 20191119 Weekly Meeting

2019-11-19 Thread Bhavani Sudha
Please find the meeting summary here -
https://cwiki.apache.org/confluence/x/OxYZC

Thanks,
Sudha

On Tue, Nov 19, 2019 at 9:06 PM Vinoth Chandar  wrote:

> Hangout link here
> 
>


20191119 Weekly Meeting

2019-11-19 Thread Vinoth Chandar
Hangout link here



Re: [DISCUSS] Introduce stricter comment and code style validation rules

2019-11-19 Thread Y Ethan Guo
+1 on all of the proposed rules.  These will also make the javadoc more
readable.

On Mon, Nov 18, 2019 at 5:55 PM Vinoth Chandar  wrote:

> +1 on all three.
>
> Would there be a overhaul of existing code to add comments to all classes?
> We are pretty reasonable already, but good to get this in shape.
>
> 17:54:37 [incubator-hudi]$ grep -R -B 1 "public class" hudi-*/src/main/java
> | grep "public class" | wc -l
>  274
> 17:54:50 [incubator-hudi]$ grep -R -B 1 "public class" hudi-*/src/main/java
> | grep "*/" | wc -l
>  178
> 17:55:06 [incubator-hudi]$
>
>
>
>
> On Mon, Nov 18, 2019 at 5:48 PM lamberken  wrote:
>
> > +1, it’s a hard work but meaningful.
> >
> >
> > | |
> > lamberken
> > IT
> > |
> > |
> > ly.com
> > lamber...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > On 11/19/2019 07:27,leesf wrote:
> > Hi vino,
> >
> > Thanks for bringing ths discussion up.
> > +1 on all. the third one seems a bit too strict and usually requires
> manual
> > processing of the import order, but I also agree and think it makes our
> > project more professional. And I learned that the calcite community is
> also
> > applying this rule.
> >
> > Best,
> > Leesf
> >
> >
> > Pratyaksh Sharma  于2019年11月18日周一 下午8:53写道:
> >
> > Having proper class level and method level comments always makes the life
> > easier for any new user.
> >
> > +1 for points 1,2 and 4.
> >
> > On Mon, Nov 18, 2019 at 5:59 PM vino yang  wrote:
> >
> > Hi guys,
> >
> > Currently, Hudi's comment and code styles do not have a uniform
> > specification on certain rules. I will list them below. With the rapid
> > development of the community, the inconsistent comment specification will
> > bring a lot of problems. I am here to assume that everyone is aware of
> > its
> > importance, so I will not spend too much time emphasizing it.
> >
> > In short, I want to add more detection rules to the current warehouse to
> > force everyone to follow a more "strict" code specification.
> >
> > These rules are listed below:
> >
> > 1) All public classes must add class-level comments;
> >
> > 2) All comments must end with a clear "."
> >
> > 3) In the import statement of the class, clearly distinguish (by blank
> > lines) the import of Java SE and the import of non-java SE. Currently, I
> > saw at least two projects(Spark and Flink) that implement this rule.
> > Flink
> > implements stricter rules than Spark. It is divided into several blocks
> > from top to bottom(owner import -> non-owner and non-JavaSE import ->
> > Java
> > SE import -> static import), each block are sorted according to the
> > natural
> > sequence of letters;
> >
> > 4) Reconfirm the method and whether the comment is consistency;
> >
> > The first, second, and third points can be checked by setting the
> > check-style rule. The fourth point requires human confirmation.
> >
> > Regarding the third point, everyone can express their views. According to
> > my personal experience, this strict model used by Flink also brings the
> > best reading experience. But this is a subjective feeling.
> >
> > Additionally, I want to collect more ideas about this topic through this
> > thread and discuss the feasibility of them.
> >
> > Any comments and feedback are commendable.
> >
> > Best,
> > Vino
> >
> >
> >
>


Re: Spark v2.3.2 : Duplicate entries found for each primary Key

2019-11-19 Thread Kabeer Ahmed
Pratyaksh, 

+1 to what Sudha has written. Lets zoom a bit closer. 
For hive, as you said, we explicitly set input format to 
HoodieParquetInputFormat.
- HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing 
but a input format for hive. Hive and Presto depend on this file to retrieve 
dataset from Hudi.

For Spark, there is no such option to set this explicitly. Rather Spark starts 
reading the paths direct from the file system (HDFS or S3). From Spark the 
calls would be as below:
- org.apache.spark.rdd.NewHadoopRDD.getPartitions
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits

Now it is evident that we cant stick the HoodieParquetInputFormat. Rather we 
rely on the PathFilter class that allows us to filter out the paths (and 
files). So we explicitly set this in the Spark Hadoop Configuration (note that 
Spark uses Hadoop FS S3 implementation to read from S3).

If you look into the file: HoodieROTablePathFilter, you will see that there is 
logic to ensure that folders (paths) or files for Hoodie related files always 
ensures that latest path/file is selected. Thus you do not see duplicate 
entries when you set. Without this, Spark is just plainly reading all the 
parquet files and displaying the data within them. 

It may take sometime from you to go through these paths and digest the flow. 
But should you still have any questions, please do not hesitate to revert back. 

Hope this helps
Kabeer.
 
 

Sent: Monday, November 18, 2019 at 7:47 PM
From: "Bhavani Sudha" 
To: dev@hudi.apache.org
Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
Hi Pratyaksh,

Let me try to answer this. I believe spark does not natively invoke
HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
queried, spark just loads all the data files in that partition without
applying Hoodie filtering logic. Thats why we need to instruct Spark to
read in the appropriate format in one of the two ways suggested by
Vinoth/Kabeer earlier.

Thanks,
Sudha

On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma 
wrote:

> Hi Vinoth/Kabeer,
>
> I have one small doubt regarding what you proposed to fix the issue. Why is
> HoodieParquetInputFormat class not able to handle deduplication of records
> in case of spark while it is able to do so in case of presto and hive?
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar  wrote:
>
> > Sweet!
> >
> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > pushpavant...@gmail.com> wrote:
> >
> > > Thanks Vinoth and Kabeer. It resolved my problem.
> > >
> > > Regards,
> > > Purushotham Pushpavanth
> > >
> > >
> > >
> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed 
> wrote:
> > >
> > > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > > paste the below line. Let us know if it still doesnt work.
> > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar 
> wrote:
> > > > > Hi,
> > > > >
> > > > > are you setting the path filters when you query the Hudi Hive table
> > via
> > > > > Spark
> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > alternatively)?
> > > > >
> > > > > - Vinoth
> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > pushpavant...@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > > Below is a create statement on my Hudi dataset.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *CREATE EXTERNAL TABLE
> `inventory`.`customer`(`_hoodie_commit_time`
> > > > string,
> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > > bigint,
> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > `tem_shipment`
> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > INPUTFORMAT
> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > 'org.apache.hadoop.hive.ql.io
> > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > 's3:inventory/customer'TBLPROPERTIES
> (
> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> '1572952974',
> > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > >
> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> in
> > > > Hive,
> > 

Re: Small clarification in Hoodie Cleaner flow

2019-11-19 Thread Balaji Varadarajan
I updated the FAQ section to set defaults correctly and add more
information related to this :
https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-WhatdoestheHudicleanerdo

The cleaner retention configuration is based on counts (number of commits
to be retained) with the assumption that users need to provide a
conservative number. The historical reason was that ingestion used to run
in specific cadence (e.g every 30 mins) with the norm being an ingestion
run taking less than 30 mins. With this model, it was simpler to represent
the configuration as a count of commits to approximate the retention time.

With delta-streamer continuous mode, ingestion is allowed to be scheduled
immediately after the previous run is scheduled. I think it would make
sense to introduce a time based retention. I have created a newbie ticket
for this : https://jira.apache.org/jira/browse/HUDI-349

Pratyaksh, In sum, if the defaults are too low, use a conservative number
based on the number of ingestion runs you see in your setup. The defaults
as referenced in the code-comments needs change (from 24 to 10).(
https://jira.apache.org/jira/browse/HUDI-350)

Thanks,
Balaji.V

On Tue, Nov 19, 2019 at 1:40 AM Pratyaksh Sharma 
wrote:

> Hi,
>
> We are assuming the following in getDeletePaths() method in cleaner flow in
> case of KEEP_LATEST_COMMITS policy -
>
> /**
> * Selects the versions for file for cleaning, such that it
> * 
> * - Leaves the latest version of the file untouched - For older versions, -
> It leaves all the commits untouched which
> * has occured in last config.getCleanerCommitsRetained()
> commits - It leaves ONE commit before this
> * window. We assume that the max(query execution time) == commit_batch_time
> * config.getCleanerCommitsRetained().
> * This is 12 hours by default. This is essential to leave the file used by
> the query thats running for the max time.
> * 
> * This provides the effect of having lookback into all changes that
> happened in the last X commits. (eg: if you
> * retain 24 commits, and commit batch time is 30 mins, then you have 12 hrs
> of lookback)
> * 
> * This policy is the default.
> */
>
> I want to understand the term commit_batch_time in this assumption and the
> assumption as a whole. As per my understanding, this term refers to the
> time taken in one iteration of DeltaSync end to end (which is hardly 7-8
> minutes in my case). If my understanding is correct, then this time will
> vary depending on the size of incoming RDD. So in that case, the time
> needed for the longest query is effectively a variable. So in that case
> what is a safe option to keep for the config
> config.getCleanerCommitsRetained().
>
> Basically I want to set the config
> config.getCleanerCommitsRetained() properly for my Hudi
> instance and hence I am trying to understand the assumption. Its default
> value is 10, I want to understand if this can be reduced further without
> any query failing.
>
> Please help me with this.
>
> Regards
> Pratyaksh
>


Re: Apache project maturity model

2019-11-19 Thread Vinoth Chandar
Thanks Thomas! Will read it over and file tickets against the "release"
component.

On Sat, Nov 16, 2019 at 1:57 PM Thomas Weise  wrote:

> Hi,
>
> The maturity model is an (optional) framework for evaluating the project. I
> would recommend to take a look and check if there are focus areas for the
> path towards graduation:
>
> https://community.apache.org/apache-way/apache-project-maturity-model.html
>
> Thanks,
> Thomas
>


Small clarification in Hoodie Cleaner flow

2019-11-19 Thread Pratyaksh Sharma
Hi,

We are assuming the following in getDeletePaths() method in cleaner flow in
case of KEEP_LATEST_COMMITS policy -

/**
* Selects the versions for file for cleaning, such that it
* 
* - Leaves the latest version of the file untouched - For older versions, -
It leaves all the commits untouched which
* has occured in last config.getCleanerCommitsRetained()
commits - It leaves ONE commit before this
* window. We assume that the max(query execution time) == commit_batch_time
* config.getCleanerCommitsRetained().
* This is 12 hours by default. This is essential to leave the file used by
the query thats running for the max time.
* 
* This provides the effect of having lookback into all changes that
happened in the last X commits. (eg: if you
* retain 24 commits, and commit batch time is 30 mins, then you have 12 hrs
of lookback)
* 
* This policy is the default.
*/

I want to understand the term commit_batch_time in this assumption and the
assumption as a whole. As per my understanding, this term refers to the
time taken in one iteration of DeltaSync end to end (which is hardly 7-8
minutes in my case). If my understanding is correct, then this time will
vary depending on the size of incoming RDD. So in that case, the time
needed for the longest query is effectively a variable. So in that case
what is a safe option to keep for the config
config.getCleanerCommitsRetained().

Basically I want to set the config
config.getCleanerCommitsRetained() properly for my Hudi
instance and hence I am trying to understand the assumption. Its default
value is 10, I want to understand if this can be reduced further without
any query failing.

Please help me with this.

Regards
Pratyaksh