leekeiabstraction commented on code in PR #290:
URL: https://github.com/apache/fluss-rust/pull/290#discussion_r2779695632
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ part_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
+ pa.field("name", pa.string()),
+ pa.field("score", pa.int64()),
+ ]
+ part_kv_schema = pa.schema(part_kv_fields)
+ fluss_part_kv_schema = fluss.Schema(
+ part_kv_schema, primary_keys=["region", "user_id"]
+ )
+
+ part_kv_descriptor = fluss.TableDescriptor(
+ fluss_part_kv_schema,
+ partition_keys=["region"],
+ )
+
+ part_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+ try:
+ await admin.drop_table(part_kv_path, ignore_if_not_exists=True)
+ await admin.create_table(part_kv_path, part_kv_descriptor, False)
+ print(f"Created partitioned KV table: {part_kv_path}")
+
+ # Create partitions
+ await admin.create_partition(part_kv_path, {"region": "US"})
+ await admin.create_partition(part_kv_path, {"region": "EU"})
+ await admin.create_partition(part_kv_path, {"region": "APAC"})
+ print("Created partitions: US, EU, APAC")
+
+ part_kv_table = await conn.get_table(part_kv_path)
+ upsert_writer = part_kv_table.new_upsert()
+
+ # Upsert rows across partitions
+ test_data = [
+ ("US", 1, "Gustave", 100),
+ ("US", 2, "Lune", 200),
+ ("EU", 1, "Sciel", 150),
+ ("EU", 2, "Maelle", 250),
+ ("APAC", 1, "Noco", 300),
+ ]
+ for region, user_id, name, score in test_data:
+ upsert_writer.upsert({
+ "region": region, "user_id": user_id,
+ "name": name, "score": score,
+ })
+ await upsert_writer.flush()
+ print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+ # Lookup all rows across partitions
+ print("\n--- Lookup across partitions ---")
+ lookuper = part_kv_table.new_lookup()
+ for region, user_id, name, score in test_data:
+ result = await lookuper.lookup({"region": region, "user_id":
user_id})
+ assert result is not None, f"Expected to find region={region}
user_id={user_id}"
+ assert result["name"] == name, f"Name mismatch: {result['name']}
!= {name}"
+ assert result["score"] == score, f"Score mismatch:
{result['score']} != {score}"
+ print(f"All {len(test_data)} rows verified across partitions")
+
+ # Update within a partition
+ print("\n--- Update within partition ---")
+ handle = upsert_writer.upsert({
+ "region": "US", "user_id": 1,
+ "name": "Gustave Updated", "score": 999,
+ })
+ await handle.wait()
+ result = await lookuper.lookup({"region": "US", "user_id": 1})
+ assert result["name"] == "Gustave Updated"
+ assert result["score"] == 999
+ print(f"Update verified: US/1 name={result['name']}
score={result['score']}")
+
+ # Lookup in non-existent partition
+ print("\n--- Lookup in non-existent partition ---")
+ result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1})
+ assert result is None, "Expected UNKNOWN partition lookup to return
None"
+ print("UNKNOWN partition lookup: not found (expected)")
Review Comment:
Does this currently return? If I recall correctly, we adhere to Java side
which has effectively infinite retry for retry-able error, PartitionNotFound
being one of them
##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
print(f"Error with partitioned table: {e}")
traceback.print_exc()
+ # =====================================================
+ # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+ # =====================================================
+ print("\n" + "=" * 60)
+ print("--- Testing Partitioned KV Table ---")
+ print("=" * 60)
+
+ part_kv_fields = [
+ pa.field("region", pa.string()), # partition key + part of PK
+ pa.field("user_id", pa.int32()), # part of PK
Review Comment:
Q: What do we mean by part here? Part (as in subset) or partitioned?
`part_kv_fields` can be confusing, if I recall correctly prefix lookup uses
subset of primary keys and this can be misconstrued. It seems like we're using
all of the primary keys.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]