luoyuxia commented on code in PR #239:
URL: https://github.com/apache/fluss-rust/pull/239#discussion_r2763902248


##########
bindings/python/example/example.py:
##########
@@ -249,6 +294,205 @@ async def main():
     except Exception as e:
         print(f"Error during scanning: {e}")
 
+    # =====================================================
+    # Demo: Primary Key Table with Lookup and Upsert
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Primary Key Table (Lookup & Upsert) ---")
+    print("=" * 60)
+
+    # Create a primary key table for lookup/upsert tests
+    # Include temporal and decimal types to test full conversion
+    pk_table_fields = [
+        pa.field("user_id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("email", pa.string()),
+        pa.field("age", pa.int32()),
+        pa.field("birth_date", pa.date32()),
+        pa.field("login_time", pa.time32("ms")),
+        pa.field("created_at", pa.timestamp("us")),  # TIMESTAMP (NTZ)
+        pa.field("updated_at", pa.timestamp("us", tz="UTC")),  # TIMESTAMP_LTZ
+        pa.field("balance", pa.decimal128(10, 2)),
+    ]
+    pk_schema = pa.schema(pk_table_fields)
+    fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
+
+    # Create table descriptor
+    pk_table_descriptor = fluss.TableDescriptor(
+        fluss_pk_schema,
+        num_buckets=3,

Review Comment:
   got warning from my ide: 
   ```
   Expected type 'str', got 'int' instead 
   ```



##########
bindings/python/example/example.py:
##########
@@ -249,6 +294,205 @@ async def main():
     except Exception as e:
         print(f"Error during scanning: {e}")
 
+    # =====================================================
+    # Demo: Primary Key Table with Lookup and Upsert
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Primary Key Table (Lookup & Upsert) ---")
+    print("=" * 60)
+
+    # Create a primary key table for lookup/upsert tests
+    # Include temporal and decimal types to test full conversion
+    pk_table_fields = [
+        pa.field("user_id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("email", pa.string()),
+        pa.field("age", pa.int32()),
+        pa.field("birth_date", pa.date32()),
+        pa.field("login_time", pa.time32("ms")),
+        pa.field("created_at", pa.timestamp("us")),  # TIMESTAMP (NTZ)
+        pa.field("updated_at", pa.timestamp("us", tz="UTC")),  # TIMESTAMP_LTZ
+        pa.field("balance", pa.decimal128(10, 2)),
+    ]
+    pk_schema = pa.schema(pk_table_fields)
+    fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
+
+    # Create table descriptor
+    pk_table_descriptor = fluss.TableDescriptor(
+        fluss_pk_schema,
+        num_buckets=3,
+    )
+
+    pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3")
+
+    try:
+        await admin.create_table(pk_table_path, pk_table_descriptor, True)
+        print(f"Created PK table: {pk_table_path}")
+    except Exception as e:
+        print(f"PK Table creation failed (may already exist): {e}")
+
+    # Get the PK table
+    pk_table = await conn.get_table(pk_table_path)
+    print(f"Got PK table: {pk_table}")
+    print(f"Has primary key: {pk_table.has_primary_key()}")
+
+    # --- Test Upsert ---
+    print("\n--- Testing Upsert ---")
+    try:
+        upsert_writer = pk_table.new_upsert()

Review Comment:
   got warning from my ide:
   ```
   Unresolved attribute reference 'new_upsert' for class 'FlussTable' 
   ```



##########
bindings/python/src/lookup.rs:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Lookuper for performing primary key lookups on a Fluss table.
+///
+/// The Lookuper caches key encoders and bucketing functions, making
+/// repeated lookups efficient. Create once and reuse for multiple lookups.
+///
+/// Example:

Review Comment:
   ```suggestion
   /// # Example:
   ```
   make the example more readable?



##########
bindings/python/example/example.py:
##########
@@ -249,6 +294,205 @@ async def main():
     except Exception as e:
         print(f"Error during scanning: {e}")
 
+    # =====================================================
+    # Demo: Primary Key Table with Lookup and Upsert
+    # =====================================================
+    print("\n" + "=" * 60)
+    print("--- Testing Primary Key Table (Lookup & Upsert) ---")
+    print("=" * 60)
+
+    # Create a primary key table for lookup/upsert tests
+    # Include temporal and decimal types to test full conversion
+    pk_table_fields = [
+        pa.field("user_id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("email", pa.string()),
+        pa.field("age", pa.int32()),
+        pa.field("birth_date", pa.date32()),
+        pa.field("login_time", pa.time32("ms")),
+        pa.field("created_at", pa.timestamp("us")),  # TIMESTAMP (NTZ)
+        pa.field("updated_at", pa.timestamp("us", tz="UTC")),  # TIMESTAMP_LTZ
+        pa.field("balance", pa.decimal128(10, 2)),
+    ]
+    pk_schema = pa.schema(pk_table_fields)
+    fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"])
+
+    # Create table descriptor
+    pk_table_descriptor = fluss.TableDescriptor(
+        fluss_pk_schema,
+        num_buckets=3,
+    )
+
+    pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3")
+
+    try:
+        await admin.create_table(pk_table_path, pk_table_descriptor, True)
+        print(f"Created PK table: {pk_table_path}")
+    except Exception as e:
+        print(f"PK Table creation failed (may already exist): {e}")
+
+    # Get the PK table
+    pk_table = await conn.get_table(pk_table_path)
+    print(f"Got PK table: {pk_table}")
+    print(f"Has primary key: {pk_table.has_primary_key()}")
+
+    # --- Test Upsert ---
+    print("\n--- Testing Upsert ---")
+    try:
+        upsert_writer = pk_table.new_upsert()
+        print(f"Created upsert writer: {upsert_writer}")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 1,
+                "name": "Alice",
+                "email": "[email protected]",
+                "age": 25,
+                "birth_date": date(1999, 5, 15),
+                "login_time": dt_time(9, 30, 45, 123000),  # 09:30:45.123
+                "created_at": datetime(
+                    2024, 1, 15, 10, 30, 45, 123456
+                ),  # with microseconds
+                "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456),
+                "balance": Decimal("1234.56"),
+            }
+        )
+        print("Upserted user_id=1 (Alice)")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 2,
+                "name": "Bob",
+                "email": "[email protected]",
+                "age": 30,
+                "birth_date": date(1994, 3, 20),
+                "login_time": dt_time(14, 15, 30, 500000),  # 14:15:30.500
+                "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+                "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555),
+                "balance": Decimal("5678.91"),
+            }
+        )
+        print("Upserted user_id=2 (Bob)")
+
+        await upsert_writer.upsert(
+            {
+                "user_id": 3,
+                "name": "Charlie",
+                "email": "[email protected]",
+                "age": 35,
+                "birth_date": date(1989, 11, 8),
+                "login_time": dt_time(16, 45, 59, 999000),  # 16:45:59.999
+                "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+                "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999),
+                "balance": Decimal("9876.54"),
+            }
+        )
+        print("Upserted user_id=3 (Charlie)")
+
+        # Update an existing row (same PK, different values)
+        await upsert_writer.upsert(
+            {
+                "user_id": 1,
+                "name": "Alice Updated",
+                "email": "[email protected]",
+                "age": 26,
+                "birth_date": date(1999, 5, 15),
+                "login_time": dt_time(10, 11, 12, 345000),  # 10:11:12.345
+                "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456),  # 
unchanged
+                "updated_at": datetime(
+                    2024, 1, 20, 15, 45, 30, 678901
+                ),  # new update time
+                "balance": Decimal("2345.67"),
+            }
+        )
+        print("Updated user_id=1 (Alice -> Alice Updated)")
+
+        # Explicit flush to ensure all upserts are acknowledged
+        await upsert_writer.flush()
+        print("Flushed all upserts")
+
+    except Exception as e:
+        print(f"Error during upsert: {e}")
+        import traceback
+
+        traceback.print_exc()
+
+    # --- Test Lookup ---
+    print("\n--- Testing Lookup ---")
+    try:
+        lookuper = pk_table.new_lookup()

Review Comment:
   dito:
   ```
   Unresolved attribute reference 'new_lookup' for class 'FlussTable' 
   ```



##########
bindings/python/src/upsert.rs:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::table::{python_pk_to_generic_row, python_to_generic_row};
+use crate::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
+/// Writer for upserting and deleting data in a Fluss primary key table.
+///
+/// Each upsert/delete operation is sent to the server and waits for 
acknowledgment.
+/// Multiple concurrent writers share a common WriterClient which batches 
requests
+/// for efficiency.
+///
+/// Example:

Review Comment:
   nit:
   ```
   /// # Example:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to