fresh-borzoni commented on code in PR #290:
URL: https://github.com/apache/fluss-rust/pull/290#discussion_r2781843416


##########
bindings/python/example/example.py:
##########
@@ -759,6 +759,111 @@ async def main():
         print(f"Error with partitioned table: {e}")
         traceback.print_exc()
 
+    # =====================================================
+    # Demo: Partitioned KV Table (Upsert, Lookup, Delete)
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Partitioned KV Table ---")
+    print("=" * 60)
+
+    partitioned_kv_fields = [
+        pa.field("region", pa.string()),   # partition key + part of PK
+        pa.field("user_id", pa.int32()),   # part of PK
+        pa.field("name", pa.string()),
+        pa.field("score", pa.int64()),
+    ]
+    partitioned_kv_schema = pa.schema(partitioned_kv_fields)
+    fluss_partitioned_kv_schema = fluss.Schema(
+        partitioned_kv_schema, primary_keys=["region", "user_id"]
+    )
+
+    partitioned_kv_descriptor = fluss.TableDescriptor(
+        fluss_partitioned_kv_schema,
+        partition_keys=["region"],
+    )
+
+    partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py")
+
+    try:
+        await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True)
+        await admin.create_table(partitioned_kv_path, 
partitioned_kv_descriptor, False)
+        print(f"Created partitioned KV table: {partitioned_kv_path}")
+
+        # Create partitions
+        await admin.create_partition(partitioned_kv_path, {"region": "US"})
+        await admin.create_partition(partitioned_kv_path, {"region": "EU"})
+        await admin.create_partition(partitioned_kv_path, {"region": "APAC"})
+        print("Created partitions: US, EU, APAC")
+
+        partitioned_kv_table = await conn.get_table(partitioned_kv_path)
+        upsert_writer = partitioned_kv_table.new_upsert()
+
+        # Upsert rows across partitions
+        test_data = [
+            ("US", 1, "Gustave", 100),
+            ("US", 2, "Lune", 200),
+            ("EU", 1, "Sciel", 150),
+            ("EU", 2, "Maelle", 250),
+            ("APAC", 1, "Noco", 300),
+        ]
+        for region, user_id, name, score in test_data:
+            upsert_writer.upsert({
+                "region": region, "user_id": user_id,
+                "name": name, "score": score,
+            })
+        await upsert_writer.flush()
+        print(f"Upserted {len(test_data)} rows across 3 partitions")
+
+        # Lookup all rows across partitions
+        print("\n--- Lookup across partitions ---")
+        lookuper = partitioned_kv_table.new_lookup()
+        for region, user_id, name, score in test_data:
+            result = await lookuper.lookup({"region": region, "user_id": 
user_id})
+            assert result is not None, f"Expected to find region={region} 
user_id={user_id}"
+            assert result["name"] == name, f"Name mismatch: {result['name']} 
!= {name}"
+            assert result["score"] == score, f"Score mismatch: 
{result['score']} != {score}"
+        print(f"All {len(test_data)} rows verified across partitions")
+
+        # Update within a partition
+        print("\n--- Update within partition ---")
+        handle = upsert_writer.upsert({
+            "region": "US", "user_id": 1,
+            "name": "Gustave Updated", "score": 999,
+        })
+        await handle.wait()
+        result = await lookuper.lookup({"region": "US", "user_id": 1})

Review Comment:
   valid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to