leekeiabstraction commented on code in PR #290:
URL: https://github.com/apache/fluss-rust/pull/290#discussion_r2779695632


##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    part_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    part_kv_schema = pa.schema(part_kv_fields)
+    fluss_part_kv_schema = fluss.Schema(
+        part_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    part_kv_descriptor = fluss.TableDescriptor(
+        fluss_part_kv_schema,
+        partition_keys=["region"],
+    )
+
+    part_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(part_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(part_kv_path, part_kv_descriptor, False)
+        print(f"Created partitioned KV table: {part_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(part_kv_path, {"region": "US"})
+        await admin.create_partition(part_kv_path, {"region": "EU"})
+        await admin.create_partition(part_kv_path, {"region": "APAC"})
+        print("Created partitions: US, EU, APAC")
+
+        part_kv_table = await conn.get_table(part_kv_path)
+        upsert_writer = part_kv_table.new_upsert()
+
+        # Upsert rows across partitions
+        test_data = [
+            ("US", 1, "Gustave", 100),
+            ("US", 2, "Lune", 200),
+            ("EU", 1, "Sciel", 150),
+            ("EU", 2, "Maelle", 250),
+            ("APAC", 1, "Noco", 300),
+        ]
+        for region, user_id, name, score in test_data:
+            upsert_writer.upsert({
+                "region": region, "user_id": user_id,
+                "name": name, "score": score,
+            })
+        await upsert_writer.flush()
+        print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+        # Lookup all rows across partitions
+        print("\n--- Lookup across partitions ---")
+        lookuper = part_kv_table.new_lookup()
+        for region, user_id, name, score in test_data:
+            result = await lookuper.lookup({"region": region, "user_id": 
user_id})
+            assert result is not None, f"Expected to find region={region} 
user_id={user_id}"
+            assert result["name"] == name, f"Name mismatch: {result['name']} 
!= {name}"
+            assert result["score"] == score, f"Score mismatch: 
{result['score']} != {score}"
+        print(f"All {len(test_data)} rows verified across partitions")
+
+        # Update within a partition
+        print("\n--- Update within partition ---")
+        handle = upsert_writer.upsert({
+            "region": "US", "user_id": 1,
+            "name": "Gustave Updated", "score": 999,
+        })
+        await handle.wait()
+        result = await lookuper.lookup({"region": "US", "user_id": 1})
+        assert result["name"] == "Gustave Updated"
+        assert result["score"] == 999
+        print(f"Update verified: US/1 name={result['name']} 
score={result['score']}")
+
+        # Lookup in non-existent partition
+        print("\n--- Lookup in non-existent partition ---")
+        result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1})
+        assert result is None, "Expected UNKNOWN partition lookup to return 
None"
+        print("UNKNOWN partition lookup: not found (expected)")

Review Comment:
   ~Does this currently return? If I recall correctly, we adhere to Java side 
which has effectively infinite retry for retry-able error, PartitionNotFound 
being one of them~
   
   Sorry, I was thinking about upsert path. Lookup should be OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to