Pratyaksh, 

+1 to what Sudha has written. Lets zoom a bit closer. 
For hive, as you said, we explicitly set input format to 
HoodieParquetInputFormat.
- HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing 
but a input format for hive. Hive and Presto depend on this file to retrieve 
dataset from Hudi.

For Spark, there is no such option to set this explicitly. Rather Spark starts 
reading the paths direct from the file system (HDFS or S3). From Spark the 
calls would be as below:
- org.apache.spark.rdd.NewHadoopRDD.getPartitions
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits

Now it is evident that we cant stick the HoodieParquetInputFormat. Rather we 
rely on the PathFilter class that allows us to filter out the paths (and 
files). So we explicitly set this in the Spark Hadoop Configuration (note that 
Spark uses Hadoop FS S3 implementation to read from S3).

If you look into the file: HoodieROTablePathFilter, you will see that there is 
logic to ensure that folders (paths) or files for Hoodie related files always 
ensures that latest path/file is selected. Thus you do not see duplicate 
entries when you set. Without this, Spark is just plainly reading all the 
parquet files and displaying the data within them. 

It may take sometime from you to go through these paths and digest the flow. 
But should you still have any questions, please do not hesitate to revert back. 

Hope this helps
Kabeer.
 
 

Sent: Monday, November 18, 2019 at 7:47 PM
From: "Bhavani Sudha" <bhavanisud...@gmail.com>
To: dev@hudi.apache.org
Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
Hi Pratyaksh,

Let me try to answer this. I believe spark does not natively invoke
HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
queried, spark just loads all the data files in that partition without
applying Hoodie filtering logic. Thats why we need to instruct Spark to
read in the appropriate format in one of the two ways suggested by
Vinoth/Kabeer earlier.

Thanks,
Sudha

On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pratyaks...@gmail.com>
wrote:

> Hi Vinoth/Kabeer,
>
> I have one small doubt regarding what you proposed to fix the issue. Why is
> HoodieParquetInputFormat class not able to handle deduplication of records
> in case of spark while it is able to do so in case of presto and hive?
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vin...@apache.org> wrote:
>
> > Sweet!
> >
> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > pushpavant...@gmail.com> wrote:
> >
> > > Thanks Vinoth and Kabeer. It resolved my problem.
> > >
> > > Regards,
> > > Purushotham Pushpavanth
> > >
> > >
> > >
> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <kab...@linuxmail.org>
> wrote:
> > >
> > > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > > paste the below line. Let us know if it still doesnt work.
> > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vin...@apache.org>
> wrote:
> > > > > Hi,
> > > > >
> > > > > are you setting the path filters when you query the Hudi Hive table
> > via
> > > > > Spark
> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > alternatively)?
> > > > >
> > > > > - Vinoth
> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > pushpavant...@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > > Below is a create statement on my Hudi dataset.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *CREATE EXTERNAL TABLE
> `inventory`.`customer`(`_hoodie_commit_time`
> > > > string,
> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > > bigint,
> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > `tem_shipment`
> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > INPUTFORMAT
> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > 'org.apache.hadoop.hive.ql.io
> > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> (
> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> '1572952974',
> > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > >
> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> in
> > > > Hive,
> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > *in Spark (All three share common Metastore).
> > > > > > We are running Hudi in COW mode and we noticed that there are
> > > multiple
> > > > > > versions of the .parquet files
> > > > > > written per partitions depending on number of updates coming to
> > them
> > > > over
> > > > > > each batch execution. When queried from Hive and Presto
> > > > > > for any Primary Key having multiple updates, it returns single
> > record
> > > > with
> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> of
> > > > taking
> > > > > > care of duplicates). Whereas, when I tried to execute the same
> > query
> > > > > > in Spark SQL, I get duplicated records for any Primary Key having
> > > > multiple
> > > > > > updates.
> > > > > >
> > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > deduplication of records across multiple commits which Presto and
> > > Hive
> > > > are
> > > > > > able to do?
> > > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > while
> > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > >
> > > > > > Thanks in advance.
> > > > > > Regards,
> > > > > > Purushotham Pushpavanth
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to