Pratyaksh, +1 to what Sudha has written. Lets zoom a bit closer. For hive, as you said, we explicitly set input format to HoodieParquetInputFormat. - HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing but a input format for hive. Hive and Presto depend on this file to retrieve dataset from Hudi.
For Spark, there is no such option to set this explicitly. Rather Spark starts reading the paths direct from the file system (HDFS or S3). From Spark the calls would be as below: - org.apache.spark.rdd.NewHadoopRDD.getPartitions - org.apache.parquet.hadoop.ParquetInputFormat.getSplits - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits Now it is evident that we cant stick the HoodieParquetInputFormat. Rather we rely on the PathFilter class that allows us to filter out the paths (and files). So we explicitly set this in the Spark Hadoop Configuration (note that Spark uses Hadoop FS S3 implementation to read from S3). If you look into the file: HoodieROTablePathFilter, you will see that there is logic to ensure that folders (paths) or files for Hoodie related files always ensures that latest path/file is selected. Thus you do not see duplicate entries when you set. Without this, Spark is just plainly reading all the parquet files and displaying the data within them. It may take sometime from you to go through these paths and digest the flow. But should you still have any questions, please do not hesitate to revert back. Hope this helps Kabeer. Sent: Monday, November 18, 2019 at 7:47 PM From: "Bhavani Sudha" <bhavanisud...@gmail.com> To: dev@hudi.apache.org Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key Hi Pratyaksh, Let me try to answer this. I believe spark does not natively invoke HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when queried, spark just loads all the data files in that partition without applying Hoodie filtering logic. Thats why we need to instruct Spark to read in the appropriate format in one of the two ways suggested by Vinoth/Kabeer earlier. Thanks, Sudha On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pratyaks...@gmail.com> wrote: > Hi Vinoth/Kabeer, > > I have one small doubt regarding what you proposed to fix the issue. Why is > HoodieParquetInputFormat class not able to handle deduplication of records > in case of spark while it is able to do so in case of presto and hive? > > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vin...@apache.org> wrote: > > > Sweet! > > > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar < > > pushpavant...@gmail.com> wrote: > > > > > Thanks Vinoth and Kabeer. It resolved my problem. > > > > > > Regards, > > > Purushotham Pushpavanth > > > > > > > > > > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <kab...@linuxmail.org> > wrote: > > > > > > > Adding to Vinoth's response, in spark-shell you just need to copy and > > > > paste the below line. Let us know if it still doesnt work. > > > > > > > > > > > > > > spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], > > > > classOf[org.apache.hadoop.fs.PathFilter]); > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vin...@apache.org> > wrote: > > > > > Hi, > > > > > > > > > > are you setting the path filters when you query the Hudi Hive table > > via > > > > > Spark > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or > > > > > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view] > > > alternatively)? > > > > > > > > > > - Vinoth > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar < > > > > > pushpavant...@gmail.com> wrote: > > > > > > > > > > > Hi, > > > > > > Below is a create statement on my Hudi dataset. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *CREATE EXTERNAL TABLE > `inventory`.`customer`(`_hoodie_commit_time` > > > > string, > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string, > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` > > > > bigint, > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint, > > > `tem_shipment` > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE > > > > > > 'org.apache.hadoop.hive.ql.io > .parquet.serde.ParquetHiveSerDe'WITH > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS > > INPUTFORMAT > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT > > > > > > 'org.apache.hadoop.hive.ql.io > > > > .parquet.MapredParquetOutputFormat'LOCATION > > > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES > ( > > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' = > '1572952974', > > > > > > 'last_commit_time_sync' = '20191114192136')* > > > > > > > > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* > in > > > > Hive, > > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and > > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar > > > > > > *in Spark (All three share common Metastore). > > > > > > We are running Hudi in COW mode and we noticed that there are > > > multiple > > > > > > versions of the .parquet files > > > > > > written per partitions depending on number of updates coming to > > them > > > > over > > > > > > each batch execution. When queried from Hive and Presto > > > > > > for any Primary Key having multiple updates, it returns single > > record > > > > with > > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic > of > > > > taking > > > > > > care of duplicates). Whereas, when I tried to execute the same > > query > > > > > > in Spark SQL, I get duplicated records for any Primary Key having > > > > multiple > > > > > > updates. > > > > > > > > > > > > Can someone help me understand why Spark is not able to handle > > > > > > deduplication of records across multiple commits which Presto and > > > Hive > > > > are > > > > > > able to do? > > > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar > > > while > > > > > > starting spark-shell. Is there something that I'm missing? > > > > > > > > > > > > Thanks in advance. > > > > > > Regards, > > > > > > Purushotham Pushpavanth > > > > > > > > > > > > > > > > > > > > > > > >