Figured out.
Below command worked for me in PySpark.

*spark._jsc.hadoopConfiguration().set('mapreduce.input.pathFilter.class','org.apache.hudi.hadoop.HoodieROTablePathFilter')*

Regards,
Purushotham Pushpavanth



On Mon, 18 Nov 2019 at 16:47, Purushotham Pushpavanthar <
[email protected]> wrote:

> Kabeer, can you please share *PySpark* command to register pathfileter
> class?
>
> Regards,
> Purushotham Pushpavanth
>
>
>
> On Mon, 18 Nov 2019 at 13:46, Pratyaksh Sharma <[email protected]>
> wrote:
>
>> Hi Vinoth/Kabeer,
>>
>> I have one small doubt regarding what you proposed to fix the issue. Why
>> is
>> HoodieParquetInputFormat class not able to handle deduplication of records
>> in case of spark while it is able to do so in case of presto and hive?
>>
>> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <[email protected]> wrote:
>>
>> > Sweet!
>> >
>> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
>> > [email protected]> wrote:
>> >
>> > > Thanks Vinoth and Kabeer. It resolved my problem.
>> > >
>> > > Regards,
>> > > Purushotham Pushpavanth
>> > >
>> > >
>> > >
>> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <[email protected]>
>> wrote:
>> > >
>> > > > Adding to Vinoth's response, in spark-shell you just need to copy
>> and
>> > > > paste the below line. Let us know if it still doesnt work.
>> > > >
>> > > >
>> > >
>> >
>> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
>> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
>> > > > classOf[org.apache.hadoop.fs.PathFilter]);
>> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <[email protected]>
>> wrote:
>> > > > > Hi,
>> > > > >
>> > > > > are you setting the path filters when you query the Hudi Hive
>> table
>> > via
>> > > > > Spark
>> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
>> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view
>> > > alternatively)?
>> > > > >
>> > > > > - Vinoth
>> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
>> > > > > [email protected]> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > > Below is a create statement on my Hudi dataset.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > *CREATE EXTERNAL TABLE
>> `inventory`.`customer`(`_hoodie_commit_time`
>> > > > string,
>> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
>> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string,
>> `id`
>> > > > bigint,
>> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
>> > > `tem_shipment`
>> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
>> > > > > > 'org.apache.hadoop.hive.ql.io
>> .parquet.serde.ParquetHiveSerDe'WITH
>> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
>> > INPUTFORMAT
>> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
>> > > > > > 'org.apache.hadoop.hive.ql.io
>> > > > .parquet.MapredParquetOutputFormat'LOCATION
>> > > > > >
>> 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
>> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
>> '1572952974',
>> > > > > > 'last_commit_time_sync' = '20191114192136')*
>> > > > > >
>> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
>> in
>> > > > Hive,
>> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
>> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
>> > > > > > *in Spark (All three share common Metastore).
>> > > > > > We are running Hudi in COW mode and we noticed that there are
>> > > multiple
>> > > > > > versions of the .parquet files
>> > > > > > written per partitions depending on number of updates coming to
>> > them
>> > > > over
>> > > > > > each batch execution. When queried from Hive and Presto
>> > > > > > for any Primary Key having multiple updates, it returns single
>> > record
>> > > > with
>> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
>> of
>> > > > taking
>> > > > > > care of duplicates). Whereas, when I tried to execute the same
>> > query
>> > > > > > in Spark SQL, I get duplicated records for any Primary Key
>> having
>> > > > multiple
>> > > > > > updates.
>> > > > > >
>> > > > > > Can someone help me understand why Spark is not able to handle
>> > > > > > deduplication of records across multiple commits which Presto
>> and
>> > > Hive
>> > > > are
>> > > > > > able to do?
>> > > > > > I've taken care of providing
>> hudi-spark-bundle-0.5.1-SNAPSHOT.jar
>> > > while
>> > > > > > starting spark-shell. Is there something that I'm missing?
>> > > > > >
>> > > > > > Thanks in advance.
>> > > > > > Regards,
>> > > > > > Purushotham Pushpavanth
>> > > > >
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to