If you're able to do multithreaded indexing, it will go much faster.
On Thu, 14 Dec, 2023, 6:51 pm Vince McMahon, <[email protected]>
wrote:
> Hi,
>
> I have written a custom python program to Index which may provide a
> better control than DIH.
>
> But, it is still doing at most 5000 documentation. I have enable debug
> logging to show after updating the log4j2.xml I am doing a count of
> documents after each batch of indexing.
>
> I would really appreciate you help to fix the Indexing only 5000 documents.
>
> The solr.log is in enclosed zip.
>
> try:
> conn = psycopg2.connect(**postgres_connection)
> cursor = conn.cursor()
>
> # Replace "your_table" and "your_columns" with your actual table and
> columns
> postgres_query = """
> SELECT
> ROW_NUMBER() over (order by update_at)
> , id, name, update_at
> FROM city
> WHERE update_at >= %s
> LIMIT %s OFFSET %s
> """
> StartDT = '2023-07-29 00:00:00.0000'
>
> batch_size = 1000 # Set your desired batch size
> offset = 0
>
> while True:
> rows = fetch_data_batch(cursor, postgres_query, StartDT,
> batch_size, offset)
>
> # conver tuples to dicts, which is what solr wants
> if rows:
> print(f"Type of row: {type(rows[0])}")
> print(f"Example row content: {rows[0]}")
> # Get column names from cursor.description
> column_names = [desc[0] for desc in cursor.description]
> rows_as_dicts = [dict(zip(column_names, row)) for row in
> rows]
>
> print(f"Type of row after rows_as_dicts: {type(rows_as_dicts[0
> ])}")
>
> if not rows:
> break # No more data
>
> # Connect to Solr
> solr = Solr(solr_url, always_commit=True)
>
> # Index data into Solr
> docs = [
> {
> # Assuming each row is a dictionary where keys are field
> names
> "id": str(row_dict["id"]),
> "name": row_dict["name"],
> "update_at": row_dict["update_at"].isoformat() if
> "update_at" in rows_as_dicts else None
> # Add more fields as needed
> }
> for row_dict in rows_as_dicts
> ]
>
>
> # Log the content of each document
> for doc in docs:
> logging.debug(f"Indexing document: {doc}")
>
> # Index data into Solr and get the response
> response = solr.add(docs)
>
> solr_core='p4a'
>
> # Construct the Solr select query URL
> select_query_url = f"http://localhost:8983/solr/{solr_core}
> /select?q=*:*&rows=0&wt=json"
>
> # Request the select query response
> select_response = requests.get(select_query_url)
>
> try:
> # Try to parse the response as JSON
> select_data = json.loads(select_response.text)
>
> # Extract the number of documents from the response
> num_docs = select_data.get("response", {}).get("numFound", -1)
>
> print(f"Total number of documents in the core '{solr_core}': {
> num_docs}")
>
> except json.decoder.JSONDecodeError as e:
> print(f"Error decoding JSON response: {e}")
> print(f"Response content: {select_response.text}")
>
>
> # Log the outcome of the indexing operation
> * logging.info <http://logging.info>("Committing batch to Solr")*
>
> # Commit the changes to Solr
> * solr.commit()*
> time.sleep(1)
> offset += batch_size
>
> finally:
> # Close PostgreSQL connection
> cursor.close()
> conn.close()
>
> Example of content from Postgres:
> Type of row: <class 'tuple'>
> Example row content: (12001, 2175, '82a4343bc11b04f42ab309e161a0bdf3',
> datetime.datetime(2023, 7, 31, 9, 39, 13, 463165))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
> SELECT
> ROW_NUMBER() over (order by update_at)
> , id, name, update_at
> FROM city
> WHERE update_at >= '2023-07-29 00:00:00.0000'
> LIMIT 1000 *OFFSET 13000*
>
> Type of row: <class 'tuple'>
> Example row content: (13001, 3089, '048e212232b13f05559c69a81a268f73',
> datetime.datetime(2023, 7, 31, 14, 22, 14, 1572))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
> SELECT
> ROW_NUMBER() over (order by update_at)
> , id, name, update_at
> FROM city
> WHERE update_at >= '2023-07-29 00:00:00.0000'
> LIMIT 1000 *OFFSET 14000*
>
> Type of row: <class 'tuple'>
> Example row content: (14001, 4939, '902ee8f138eb4e83d94b13e34f402de3',
> datetime.datetime(2023, 7, 31, 18, 59, 8, 906065))
> Type of row after rows_as_dicts: <class 'dict'>
> Total number of documents in the core 'p4a': 5000
>
> SELECT
> ROW_NUMBER() over (order by update_at)
> , id, name, update_at
> FROM city
> WHERE update_at >= '2023-07-29 00:00:00.0000'
> LIMIT 1000 OFFSET 15000
>
>
>