ShravanSunder commented on issue #3293:
URL: https://github.com/apache/arrow-adbc/issues/3293#issuecomment-3224368139
Hi @lidavidm
i can share how i tested my solution. It contains the type of information i
expected the driver to handle for jsonb (json extension for arrow)
```python
import typing as t
import pyarrow as pa
import pytest
from bills_ai.helpers.adbc_arrow_shim import (
prepare_arrow_table_for_pg_adbc,
)
from pydantic_core import from_json
from voyager_lib.basemodel.arrow_table_base_model import ArrowTableBaseModel
from voyager_lib.database.configure_voyager_db import get_adbc_cursor
class EmbeddingRecord(ArrowTableBaseModel):
id: int
embedding: t.List[float]
note: str
def generate_vectors(n_rows: int, dim: int) -> t.List[t.List[float]]:
# Deterministic values without numpy
scale = 0.001
out: t.List[t.List[float]] = []
for i in range(n_rows):
row: t.List[float] = [float(((i * 131 + j * 17) % 1000) * scale) for
j in range(dim)]
out.append(row)
return out
def coerce_id_dist(rows: t.Any) -> t.List[tuple[int, float]]: # noqa: ANN401
result: t.List[tuple[int, float]] = []
typed_rows = t.cast(t.List[t.Tuple[t.Any, t.Any]], rows)
for r in typed_rows:
result.append((int(r[0]), float(r[1])))
return result
@pytest.fixture
def setup_pgvector_tables(bills_voyager_db_session: None) -> t.Iterator[int]:
dim = 1024
with get_adbc_cursor() as cur:
cur.execute("BEGIN")
cur.execute("CREATE SCHEMA IF NOT EXISTS bills")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
cur.execute("DROP TABLE IF EXISTS emb_arrow")
cur.execute(f"CREATE TABLE emb_arrow (id BIGINT PRIMARY KEY,
embedding VECTOR({dim}), note TEXT)")
cur.execute("COMMIT")
try:
yield dim
finally:
with get_adbc_cursor() as cur:
cur.execute("DROP TABLE IF EXISTS emb_arrow")
cur.execute("COMMIT")
@pytest.mark.integration_db
def
test_adbc_ingest_and_similarity_via_arrowtable_bytes(setup_pgvector_tables:
int) -> None: # noqa: PLR0915
# Arrange
dim = setup_pgvector_tables
n_rows = 8
# Generate test data with id=1 having special note
all_vectors = generate_vectors(n_rows, dim)
rows: t.List[EmbeddingRecord] = []
for i, vec in enumerate(all_vectors):
note = "note1" if i == 0 else "first_batch"
rows.append(EmbeddingRecord(id=i + 1, embedding=vec, note=note))
first_batch_table = EmbeddingRecord.pydantic_list_to_arrow_table(rows)
first_batch_converted =
prepare_arrow_table_for_pg_adbc(first_batch_table)
# Second batch excludes id=1 to preserve note1
second_batch_vectors = generate_vectors(n_rows, dim)
second_batch_rows = [EmbeddingRecord(id=i + 1, embedding=vec,
note="second_batch") for i, vec in enumerate(second_batch_vectors) if i != 0]
second_batch_table =
EmbeddingRecord.pydantic_list_to_arrow_table(second_batch_rows)
second_batch_converted =
prepare_arrow_table_for_pg_adbc(second_batch_table)
# Act - Insert first batch
assert first_batch_converted.schema.field("embedding").type ==
pa.binary()
assert second_batch_converted.schema.field("embedding").type ==
pa.binary()
with get_adbc_cursor() as cur:
try:
cur.execute("BEGIN")
cur.execute("DROP TABLE IF EXISTS emb_arrow_staging")
cur.execute("CREATE TEMP TABLE emb_arrow_staging (LIKE emb_arrow
INCLUDING DEFAULTS) ON COMMIT DROP")
cur.adbc_ingest("emb_arrow_staging", first_batch_converted,
mode="append", temporary=True)
cur.execute(
"""
DELETE FROM emb_arrow t
USING emb_arrow_staging s
WHERE t.id = s.id;
""",
)
cur.execute(
"""
INSERT INTO emb_arrow
SELECT * FROM emb_arrow_staging;
""",
)
cur.execute("COMMIT")
except Exception:
cur.execute("ROLLBACK")
raise
# Act - Insert second batch (partial upsert)
with get_adbc_cursor() as cur:
try:
cur.execute("BEGIN")
cur.execute("DROP TABLE IF EXISTS emb_arrow_staging")
cur.execute("CREATE TEMP TABLE emb_arrow_staging (LIKE emb_arrow
INCLUDING DEFAULTS) ON COMMIT DROP")
cur.adbc_ingest("emb_arrow_staging", second_batch_converted,
mode="append", temporary=True)
cur.execute(
"""
DELETE FROM emb_arrow t
USING emb_arrow_staging s
WHERE t.id = s.id;
""",
)
cur.execute(
"""
INSERT INTO emb_arrow
SELECT * FROM emb_arrow_staging;
""",
)
cur.execute("COMMIT")
except Exception:
cur.execute("ROLLBACK")
raise
# Assert
with get_adbc_cursor() as cur:
cur.execute("SELECT COUNT(*) FROM emb_arrow")
count = cur.fetchone()[0] # pyright:
ignore[reportUnknownVariableType,reportOptionalSubscript]
assert count == n_rows, f"Expected {n_rows} records, got {count}"
# Verify id=1 kept note1, others have second_batch
cur.execute("SELECT note FROM emb_arrow WHERE id = 1")
id1_note = cur.fetchone()[0] # pyright:
ignore[reportUnknownVariableType,reportOptionalSubscript]
assert id1_note == "note1", f"Expected id=1 to have 'note1', got
'{id1_note}'"
cur.execute("SELECT DISTINCT note FROM emb_arrow WHERE id > 1 ORDER
BY note")
other_notes = [row[0] for row in cur.fetchall()] # pyright:
ignore[reportUnknownVariableType]
assert other_notes == ["second_batch"], f"Expected other records to
have 'second_batch', got {other_notes}"
# Verify similarity search works
cur.execute(
"""
SELECT e2.id,
(SELECT embedding FROM emb_arrow WHERE id = 1) <->
e2.embedding AS dist
FROM emb_arrow e2
ORDER BY dist ASC, id ASC
""",
)
out = coerce_id_dist(cur.fetchall())
ids_by_dist = [r[0] for r in out]
assert ids_by_dist[0] == 1
assert len(ids_by_dist) == n_rows
# Type aliases for database result tuples
JsonbTestRow = tuple[int, t.Dict[str, t.Any], str]
CountRow = tuple[int]
IdRow = tuple[int]
class JsonbTestRecord(ArrowTableBaseModel):
"""Test model that creates extension<arrow.json> schema type."""
id: int
jsonb_data: t.Dict[str, t.Any] # This will create extension<arrow.json>
name: str
@pytest.mark.integration_db
def test_adbc_jsonb_arrow_extension_works() -> None:
"""Test that Arrow extension<arrow.json> works with PostgreSQL JSONB via
ADBC using the shim."""
# Arrange
# Create test data with Dict field that creates extension<arrow.json>
schema
test_data = [
JsonbTestRecord(
id=1,
jsonb_data={"key": "value", "array": [1, 2, 3], "nested":
{"deep": True}},
name="test_record",
),
JsonbTestRecord(
id=2,
jsonb_data={"another": "json", "number": 42},
name="second_record",
),
]
# Create Arrow table - this will have extension<arrow.json> schema
arrow_table = JsonbTestRecord.pydantic_list_to_arrow_table(test_data)
# Verify original has extension<arrow.json> type
assert str(arrow_table.schema.field("jsonb_data").type) ==
"extension<arrow.json>" # pyright: ignore[reportUnknownArgumentType]
converted_table = prepare_arrow_table_for_pg_adbc(arrow_table)
# Act - Verify conversion to binary with 0x01 prefix
assert converted_table.schema.field("jsonb_data").type == pa.binary()
# Check the binary format has 0x01 prefix (JSONB version byte)
binary_data = converted_table["jsonb_data"][0].as_py()
assert binary_data.startswith(b"\x01"), "JSONB binary should start with
version byte 0x01"
with get_adbc_cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS bills")
cur.execute("DROP TABLE IF EXISTS jsonb_test")
cur.execute("CREATE TEMP TABLE jsonb_test (id BIGINT PRIMARY KEY,
jsonb_data JSONB, name TEXT)")
# This should now work with the converted table
cur.adbc_ingest("jsonb_test", converted_table, mode="append",
temporary=True)
# Assert
# Verify data was inserted correctly
cur.execute("SELECT COUNT(*) FROM jsonb_test")
count_result = t.cast(CountRow | None, cur.fetchone())
assert count_result is not None
count = count_result[0]
assert count == 2, f"Expected 2 records, got {count}"
# Verify JSON data is properly stored and queryable
cur.execute("SELECT id, jsonb_data, name FROM jsonb_test ORDER BY
id")
rows = t.cast(t.List[JsonbTestRow], cur.fetchall())
# Check first record
first_row = rows[0]
assert first_row[0] == 1
assert first_row[2] == "test_record"
first_json = first_row[1]
# ADBC returns JSONB as string, parse it
if isinstance(first_json, str):
first_json = from_json(first_json)
assert first_json["key"] == "value"
assert first_json["array"] == [1, 2, 3]
assert first_json["nested"]["deep"] is True
# Check second record
second_row = rows[1]
assert second_row[0] == 2
assert second_row[2] == "second_record"
second_json = second_row[1]
# ADBC returns JSONB as string, parse it
if isinstance(second_json, str):
second_json = from_json(second_json)
assert second_json["another"] == "json"
assert second_json["number"] == 42
# Test JSON querying capabilities
cur.execute("SELECT id FROM jsonb_test WHERE jsonb_data->>'key' =
'value'")
json_query_result = t.cast(IdRow | None, cur.fetchone())
assert json_query_result is not None
assert json_query_result[0] == 1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]