Hi,

yes the indexing DAG can support this today and even if not, it can be
easily fixed. Main issue would be how we encode the mapping well.
for e.g if we want map from user_id to all events that belong to the user,
we need a different, scalable way of storing this mapping.

I can organize this work under the 1.0 stream, if you are interested in
driving.

Thanks
Vinoth


On Thu, Jul 13, 2023 at 1:09 PM nicolas paris <nicolas.pa...@riseup.net>
wrote:

> Hello Prashant, thanks for your time.
>
>
> > With non unique keys how would tagging of records (for updates /
> deletes) work?
>
> Currently both GLOBAL_SIMPLE/BLOOM work out of the box in the mentioned
> context. See below pyspark script and results. As for the
> implementation, the tagLocationBacktoRecords returns a rdd of
> HoodieRecord with (key/part/location), and it can contain duplicate
> keys (then multiple records for same key).
>
> ```
> tableName = "test_global_bloom"
> basePath = f"/tmp/{tableName}"
>
> hudi_options = {
>     "hoodie.table.name": tableName,
>     "hoodie.datasource.write.recordkey.field": "event_id",
>     "hoodie.datasource.write.partitionpath.field": "part",
>     "hoodie.datasource.write.table.name": tableName,
>     "hoodie.datasource.write.precombine.field": "ts",
>     "hoodie.datasource.write.hive_style_partitioning": "true",
>     "hoodie.datasource.hive_sync.enable": "false",
>     "hoodie.metadata.enable": "true",
>     "hoodie.index.type": "GLOBAL_BLOOM", # GLOBAL_SIMPLE works as well
> }
>
> # LET'S GEN DUPLS
> mode="overwrite"
> df =spark.sql("""select '1' as event_id, '2' as ts, '2' as part UNION
>                  select '1' as event_id, '3' as ts, '3' as part UNION
>                  select '1' as event_id, '2' as ts, '3' as part UNION
>                  select '2' as event_id, '2' as ts, '3' as part""")
> df.write.format("hudi").options(**hudi_options).option("hoodie.datasour
> ce.write.operation", "BULK_INSERT").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # +--------+---+----+
> # |event_id| ts|part|
> # +--------+---+----+
> # |       1|  3|   3|
> # |       1|  2|   3|
> # |       2|  2|   3|
> # |       1|  2|   2|
> # +--------+---+----+
>
> # UPDATE
> mode="append"
> spark.sql("select '1' as event_id, '20' as ts, '4' as
> part").write.format("hudi").options(**hudi_options).option("hoodie.data
> source.write.operation", "UPSERT").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # +--------+---+----+
> # |event_id| ts|part|
> # +--------+---+----+
> # |       1| 20|   4|
> # |       1| 20|   4|
> # |       1| 20|   4|
> # |       2|  2|   3|
> # +--------+---+----+
>
> # DELETE
> mode="append"
> spark.sql("select 1 as
> event_id").write.format("hudi").options(**hudi_options).option("hoodie.
> datasource.write.operation", "DELETE").mode(mode).save(basePath)
> spark.read.format("hudi").load(basePath).select("event_id",
> "ts","part").show()
> # +--------+---+----+
> # |event_id| ts|part|
> # +--------+---+----+
> # |       2|  2|   3|
> # +--------+---+----+
> ```
>
>
> > How would record Index know which mapping of the array to
> return for a given record key?
>
> As well as GLOBAL_SIMPLE/BLOOM, for a given record key, the RLI would
> return a list of mapping. Then the operation (update, delete, FCOW ...)
> would apply to each location.
>
> To illustrate, we could get something like this in the MDT:
>
> |event_id:1|[
>              {part=2, -5811947225812876253, -6812062179961430298, 0,
> 1689147210233},
>              {part=3, -711947225812876253, -8812062179961430298, 1,
> 1689147210233},
>              {part=3, -1811947225812876253, -2812062179961430298, 0,
> 1689147210233}
>              ]|
>
>
> On Thu, 2023-07-13 at 10:17 -0700, Prashant Wason wrote:
> > Hi Nicolas,
> >
> > The RI feature is designed for max performance as it is at a record-
> > count
> > scale. Hence, the schema is simplified and minimized.
> >
> > With non unique keys how would tagging of records (for updates /
> > deletes)
> > work? How would record Index know which mapping of the array to
> > return for
> > a given record key?
> >
> > Thanks
> > Prashant
> >
> >
> >
> > On Wed, Jul 12, 2023 at 2:02 AM nicolas paris
> > <nicolas.pa...@riseup.net>
> > wrote:
> >
> > > hi there,
> > >
> > > Just tested preview of RLI (rfc-08), amazing feature. Soon the fast
> > > COW
> > > (rfc-68) will be based on RLI to get the parquet offsets and allow
> > > targeting parquet row groups.
> > >
> > > RLI is a global index, therefore it assumes the hudi key is present
> > > in
> > > at most one parquet file. As a result in the MDT, the RLI is of
> > > type
> > > struct, and there is a 1:1 mapping w/ a given file.
> > >
> > > Type:
> > >    |-- recordIndexMetadata: struct (nullable = true)
> > >    |    |-- partition: string (nullable = false)
> > >    |    |-- fileIdHighBits: long (nullable = false)
> > >    |    |-- fileIdLowBits: long (nullable = false)
> > >    |    |-- fileIndex: integer (nullable = false)
> > >    |    |-- instantTime: long (nullable = false)
> > >
> > > Content:
> > >    |event_id:1        |{part=3, -6811947225812876253,
> > > -7812062179961430298, 0, 1689147210233}|
> > >
> > > We would love to use both RLI and FCOW features, but I'm afraid our
> > > keys are not unique in our kafka archives. Same key might be
> > > present
> > > in multiple partitions, and even in multiple slices within
> > > partitions.
> > >
> > > I wonder if the future, RLI could support multiple parquet files
> > > (by
> > > storing an array of struct for eg). This would enable to leverage
> > > LRI
> > > in more contexts
> > >
> > > Thx
> > >
> > >
> > >
> > >
> > >
>
>

Reply via email to