This is an automated email from the ASF dual-hosted git repository.

jgemignani pushed a commit to branch PG12
in repository https://gitbox.apache.org/repos/asf/age.git


The following commit(s) were added to refs/heads/PG12 by this push:
     new 9e0c05d5 Revamp age csv loader (#2044) (#2065)
9e0c05d5 is described below

commit 9e0c05d56041a6fcee237fa2598f2d039be139d6
Author: Muhammad Taha Naveed <[email protected]>
AuthorDate: Thu Aug 22 22:32:47 2024 +0500

    Revamp age csv loader (#2044) (#2065)
    
    * Allow 0 as entry_id
    
    - No regression test were impacted by this change.
    
    * Use batch inserts to improve performance
    
    - Changed heap_insert to heap_multi_insert since it is faster than
      calling heap_insert() in a loop. When multiple tuples can be inserted
      on a single page, just a single WAL record covering all of them, and
      only need to lock/unlock the page once.
    
    - BATCH_SIZE is set to 1000, which is the number of tuples to insert in
      a single batch. This number was chosen after some experimentation.
    
    - Change some of the field names to avoid confusion.
    
    * Use sequence for generating ids for edge and vertex
    
    - Sequence is not used if the id_field_exists is true in
      load_labels_from_file function, since the entry id is present in the
      csv.
    
    * Add function to create temporary table for ids
    
    - Created a temporary table and populate it with already generated
      vertex ids. A unique index is created on id column to ensure that
      new ids generated (using entry id from csv) are unique.
    
    * Insert generated ids in the temporary table to enforce uniqueness
    
    - Insert ids in the temporary table and also update index to
      enforce uniqueness.
    - If the entry id provided in the CSV is greater than the current
      sequence value, the sequence value is updated to match the entry ID.
      For example:
      Suppose the current sequence value is 1, and the CSV entry ID is 2.
      If we use 2 but not update the sequence to 2, next time the CREATE
      clause is used, 2 will be returned by sequence as an entry id,
      resulting in duplicate.
    - Update batch functions
    
    * Add functions to create graph and label automatically
    
    - These functions will check existence of graph and label, and create
      them if they don't exist.
    
    * Add regression tests
---
 regress/expected/age_load.out           | 139 +++++++++++----
 regress/sql/age_load.sql                |  74 +++++++-
 src/backend/catalog/ag_label.c          |   5 +
 src/backend/commands/graph_commands.c   |  32 ++--
 src/backend/utils/load/ag_load_edges.c  | 133 ++++++++++++--
 src/backend/utils/load/ag_load_labels.c | 298 +++++++++++++++++++++++++++++---
 src/backend/utils/load/age_load.c       | 126 +++++++++++++-
 src/include/catalog/ag_label.h          |   2 +
 src/include/commands/graph_commands.h   |   1 +
 src/include/utils/graphid.h             |   2 +-
 src/include/utils/load/ag_load_edges.h  |  12 +-
 src/include/utils/load/ag_load_labels.h |  11 +-
 src/include/utils/load/age_load.h       |  17 ++
 13 files changed, 741 insertions(+), 111 deletions(-)

diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out
index 8635a499..8f216341 100644
--- a/regress/expected/age_load.out
+++ b/regress/expected/age_load.out
@@ -19,6 +19,7 @@
 \! cp -r regress/age_load/data regress/instance/data/age_load
 LOAD 'age';
 SET search_path TO ag_catalog;
+-- Create a country using CREATE clause
 SELECT create_graph('agload_test_graph');
 NOTICE:  graph "agload_test_graph" has been created
  create_graph 
@@ -26,34 +27,79 @@ NOTICE:  graph "agload_test_graph" has been created
  
 (1 row)
 
-SELECT create_vlabel('agload_test_graph','Country');
-NOTICE:  VLabel "Country" has been created
- create_vlabel 
----------------
- 
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) 
RETURN n$$) as (n agtype);
+                                        n                                      
   
+----------------------------------------------------------------------------------
+ {"id": 844424930131969, "label": "Country", "properties": {"__id__": 
1}}::vertex
 (1 row)
 
+--
+-- Load countries with id
+--
 SELECT load_labels_from_file('agload_test_graph', 'Country',
-    'age_load/countries.csv');
+    'age_load/countries.csv', true);
  load_labels_from_file 
 -----------------------
  
 (1 row)
 
-SELECT create_vlabel('agload_test_graph','City');
-NOTICE:  VLabel "City" has been created
- create_vlabel 
----------------
- 
+-- A temporary table should have been created with 54 ids; 1 from CREATE and 
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+ ?column? 
+----------
+ t
 (1 row)
 
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', true);
+ERROR:  Cannot insert duplicate vertex id: 844424930131970
+HINT:  Entry id 2 is already used
+--
+-- Load cities with id
+--
+-- Should create City label automatically and load cities
 SELECT load_labels_from_file('agload_test_graph', 'City',
-    'age_load/cities.csv');
+    'age_load/cities.csv', true);
+NOTICE:  VLabel "City" has been created
  load_labels_from_file 
 -----------------------
  
 (1 row)
 
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+    'age_load/cities.csv', true);
+ERROR:  Cannot insert duplicate vertex id: 1125899906842777
+HINT:  Entry id 153 is already used
+--
+-- Load edges -- Connects cities to countries
+--
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+    'age_load/edges.csv');
+ERROR:  label "Country" already exists as edge label
 SELECT create_elabel('agload_test_graph','has_city');
 NOTICE:  ELabel "has_city" has been created
  create_elabel 
@@ -68,6 +114,17 @@ SELECT load_edges_from_file('agload_test_graph', 'has_city',
  
 (1 row)
 
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+     'age_load/cities.csv');
+ERROR:  label "has_city" already exists as vertex label
 SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
 FROM information_schema.tables
 WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -83,7 +140,7 @@ WHERE table_schema = 'agload_test_graph' ORDER BY table_name 
ASC;
 SELECT COUNT(*) FROM agload_test_graph."Country";
  count 
 -------
-    53
+    54
 (1 row)
 
 SELECT COUNT(*) FROM agload_test_graph."City";
@@ -101,7 +158,7 @@ SELECT COUNT(*) FROM agload_test_graph."has_city";
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n 
agtype);
  count 
 -------
- 72538
+ 72539
 (1 row)
 
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN 
e$$) as (n agtype);
@@ -110,6 +167,17 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH 
(a)-[e]->(b) RETURN e$$
  72485
 (1 row)
 
+--
+-- Load countries and cities without id
+--
+-- Should load countries in Country label without error since it should use 
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', false);
+ load_labels_from_file 
+-----------------------
+ 
+(1 row)
+
 SELECT create_vlabel('agload_test_graph','Country2');
 NOTICE:  VLabel "Country2" has been created
  create_vlabel 
@@ -153,6 +221,7 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
 SELECT id FROM agload_test_graph."Country" LIMIT 10;
        id        
 -----------------
+ 844424930131969
  844424930131970
  844424930131971
  844424930131974
@@ -162,7 +231,6 @@ SELECT id FROM agload_test_graph."Country" LIMIT 10;
  844424930131996
  844424930132002
  844424930132023
- 844424930132025
 (10 rows)
 
 SELECT id FROM agload_test_graph."Country2" LIMIT 10;
@@ -180,13 +248,16 @@ SELECT id FROM agload_test_graph."Country2" LIMIT 10;
  1688849860263946
 (10 rows)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)      |  n.name   | n.iso2 
 -----------------+-----------+--------
  844424930131990 | "Belgium" | "BE"
-(1 row)
+ 844424930132223 | "Belgium" | "BE"
+(2 rows)
 
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)       |  n.name   | n.iso2 
@@ -194,13 +265,16 @@ SELECT * FROM cypher('agload_test_graph', 
$$MATCH(n:Country2 {iso2 : 'BE'})
  1688849860263942 | "Belgium" | "BE"
 (1 row)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)      |  n.name   | n.iso2 
 -----------------+-----------+--------
  844424930131983 | "Austria" | "AT"
-(1 row)
+ 844424930132221 | "Austria" | "AT"
+(2 rows)
 
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)       |  n.name   | n.iso2 
@@ -208,15 +282,17 @@ SELECT * FROM cypher('agload_test_graph', 
$$MATCH(n:Country2 {iso2 : 'AT'})
  1688849860263940 | "Austria" | "AT"
 (1 row)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$
     MATCH (u:Country {region : "Europe"})
     WHERE u.name =~ 'Cro.*'
-    RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
- result_1  | result_2 
------------+----------
- "Croatia" | "Europe"
-(1 row)
+    RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
+      id(u)      | result_1  | result_2 
+-----------------+-----------+----------
+ 844424930132023 | "Croatia" | "Europe"
+ 844424930132226 | "Croatia" | "Europe"
+(2 rows)
 
 SELECT drop_graph('agload_test_graph', true);
 NOTICE:  drop cascades to 7 other objects
@@ -236,22 +312,11 @@ NOTICE:  graph "agload_test_graph" has been dropped
 --
 -- Test property type conversion
 --
-SELECT create_graph('agload_conversion');
-NOTICE:  graph "agload_conversion" has been created
- create_graph 
---------------
- 
-(1 row)
-
 -- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
-NOTICE:  VLabel "Person1" has been created
- create_vlabel 
----------------
- 
-(1 row)
-
+-- Should create graph and label automatically
 SELECT load_labels_from_file('agload_conversion', 'Person1', 
'age_load/conversion_vertices.csv', true, true);
+NOTICE:  graph "agload_conversion" has been created
+NOTICE:  VLabel "Person1" has been created
  load_labels_from_file 
 -----------------------
  
diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql
index cee34f59..0d76654f 100644
--- a/regress/sql/age_load.sql
+++ b/regress/sql/age_load.sql
@@ -22,20 +22,65 @@
 LOAD 'age';
 
 SET search_path TO ag_catalog;
+
+-- Create a country using CREATE clause
 SELECT create_graph('agload_test_graph');
 
-SELECT create_vlabel('agload_test_graph','Country');
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) 
RETURN n$$) as (n agtype);
+
+--
+-- Load countries with id
+--
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', true);
+
+-- A temporary table should have been created with 54 ids; 1 from CREATE and 
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+
+-- Should error out on loading the same file again due to duplicate id
 SELECT load_labels_from_file('agload_test_graph', 'Country',
-    'age_load/countries.csv');
+    'age_load/countries.csv', true);
+
+--
+-- Load cities with id
+--
 
-SELECT create_vlabel('agload_test_graph','City');
+-- Should create City label automatically and load cities
 SELECT load_labels_from_file('agload_test_graph', 'City',
-    'age_load/cities.csv');
+    'age_load/cities.csv', true);
+
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+    'age_load/cities.csv', true);
+
+--
+-- Load edges -- Connects cities to countries
+--
+
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+    'age_load/edges.csv');
 
 SELECT create_elabel('agload_test_graph','has_city');
 SELECT load_edges_from_file('agload_test_graph', 'has_city',
      'age_load/edges.csv');
 
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+     'age_load/cities.csv');
+
 SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
 FROM information_schema.tables
 WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -48,6 +93,14 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) 
RETURN n$$) as (n ag
 
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN 
e$$) as (n agtype);
 
+--
+-- Load countries and cities without id
+--
+
+-- Should load countries in Country label without error since it should use 
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', false);
+
 SELECT create_vlabel('agload_test_graph','Country2');
 SELECT load_labels_from_file('agload_test_graph', 'Country2',
                              'age_load/countries.csv', false);
@@ -62,31 +115,36 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
 SELECT id FROM agload_test_graph."Country" LIMIT 10;
 SELECT id FROM agload_test_graph."Country2" LIMIT 10;
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$
     MATCH (u:Country {region : "Europe"})
     WHERE u.name =~ 'Cro.*'
-    RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
+    RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
 
 SELECT drop_graph('agload_test_graph', true);
 
 --
 -- Test property type conversion
 --
-SELECT create_graph('agload_conversion');
 
 -- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
+
+-- Should create graph and label automatically
 SELECT load_labels_from_file('agload_conversion', 'Person1', 
'age_load/conversion_vertices.csv', true, true);
 SELECT * FROM cypher('agload_conversion', $$ MATCH (n:Person1) RETURN 
properties(n) $$) as (a agtype);
 
diff --git a/src/backend/catalog/ag_label.c b/src/backend/catalog/ag_label.c
index 6cf2a552..5ca2cdaf 100644
--- a/src/backend/catalog/ag_label.c
+++ b/src/backend/catalog/ag_label.c
@@ -174,6 +174,11 @@ char get_label_kind(const char *label_name, Oid 
label_graph)
     }
 }
 
+char *get_label_seq_relation_name(const char *label_name)
+{
+    return psprintf("%s_id_seq", label_name);
+}
+
 PG_FUNCTION_INFO_V1(_label_name);
 
 /*
diff --git a/src/backend/commands/graph_commands.c 
b/src/backend/commands/graph_commands.c
index a558d298..efce8a35 100644
--- a/src/backend/commands/graph_commands.c
+++ b/src/backend/commands/graph_commands.c
@@ -42,6 +42,7 @@
 #include "catalog/ag_label.h"
 #include "commands/label_commands.h"
 #include "utils/graphid.h"
+#include "commands/graph_commands.h"
 #include "utils/name_validation.h"
 
 /*
@@ -60,10 +61,7 @@ PG_FUNCTION_INFO_V1(create_graph);
 /* function that is evoked for creating a graph */
 Datum create_graph(PG_FUNCTION_ARGS)
 {
-    char *graph;
     Name graph_name;
-    char *graph_name_str;
-    Oid nsp_id;
 
     /* if no argument is passed with the function, graph name cannot be null */
     if (PG_ARGISNULL(0))
@@ -75,6 +73,23 @@ Datum create_graph(PG_FUNCTION_ARGS)
     /* gets graph name as function argument */
     graph_name = PG_GETARG_NAME(0);  
 
+    create_graph_internal(graph_name);
+
+    ereport(NOTICE,
+            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+
+    /* 
+     * According to postgres specification of c-language functions
+     * if function returns void this is the syntax.
+     */
+    PG_RETURN_VOID(); 
+}
+
+Oid create_graph_internal(const Name graph_name)
+{
+    Oid nsp_id;
+    char *graph_name_str;
+
     graph_name_str = NameStr(*graph_name);
 
     /* checking if the name of the graph falls under the pre-decided graph 
naming conventions(regex) */
@@ -101,15 +116,10 @@ Datum create_graph(PG_FUNCTION_ARGS)
     CommandCounterIncrement();
 
     /* Create the default label tables */
-    graph = graph_name->data;
-    create_label(graph, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL);
-    create_label(graph, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
+    create_label(graph_name_str, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, 
NIL);
+    create_label(graph_name_str, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
 
-    ereport(NOTICE,
-            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
-
-    /* according to postgres specification of c-language functions if function 
returns void this is the syntax */
-    PG_RETURN_VOID(); 
+    return nsp_id;
 }
 
 static Oid create_schema_for_graph(const Name graph_name)
diff --git a/src/backend/utils/load/ag_load_edges.c 
b/src/backend/utils/load/ag_load_edges.c
index 4f2f66a3..71683bf4 100644
--- a/src/backend/utils/load/ag_load_edges.c
+++ b/src/backend/utils/load/ag_load_edges.c
@@ -20,9 +20,13 @@
 #include "postgres.h"
 
 #include "utils/load/ag_load_edges.h"
-#include "utils/load/age_load.h"
 #include "utils/load/csv.h"
 
+void init_edge_batch_insert(batch_insert_state **batch_state,
+                            char *label_name, Oid graph_oid);
+void finish_edge_batch_insert(batch_insert_state **batch_state,
+                              char *label_name, Oid graph_oid);
+
 void edge_field_cb(void *field, size_t field_len, void *data)
 {
 
@@ -58,6 +62,7 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
 {
 
     csv_edge_reader *cr = (csv_edge_reader*)data;
+    batch_insert_state *batch_state = cr->batch_state;
 
     size_t i, n_fields;
     int64 start_id_int;
@@ -68,9 +73,9 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
     graphid end_vertex_graph_id;
     int end_vertex_type_id;
 
-    graphid object_graph_id;
-
-    agtype* props = NULL;
+    graphid edge_id;
+    int64 entry_id;
+    TupleTableSlot *slot;
 
     n_fields = cr->cur_field;
 
@@ -89,7 +94,8 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
     }
     else
     {
-        object_graph_id = make_graphid(cr->object_id, (int64)cr->row);
+        entry_id = nextval_internal(cr->label_seq_relid, true);
+        edge_id = make_graphid(cr->label_id, entry_id);
 
         start_id_int = strtol(cr->fields[0], NULL, 10);
         start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
@@ -99,14 +105,35 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
         start_vertex_graph_id = make_graphid(start_vertex_type_id, 
start_id_int);
         end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
 
-        props = create_agtype_from_list_i(cr->header, cr->fields,
-                                          n_fields, 4, cr->load_as_agtype);
-
-        insert_edge_simple(cr->graph_oid, cr->object_name,
-                           object_graph_id, start_vertex_graph_id,
-                           end_vertex_graph_id, props);
-
-        pfree(props);
+        /* Get the appropriate slot from the batch state */
+        slot = batch_state->slots[batch_state->num_tuples];
+
+        /* Clear the slots contents */
+        ExecClearTuple(slot);
+
+        /* Fill the values in the slot */
+        slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
+        slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
+        slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
+        slot->tts_values[3] = AGTYPE_P_GET_DATUM(
+                                create_agtype_from_list_i(
+                                    cr->header, cr->fields,
+                                    n_fields, 4, cr->load_as_agtype));
+        slot->tts_isnull[0] = false;
+        slot->tts_isnull[1] = false;
+        slot->tts_isnull[2] = false;
+        slot->tts_isnull[3] = false;
+
+        /* Make the slot as containing virtual tuple */
+        ExecStoreVirtualTuple(slot);
+        batch_state->num_tuples++;
+
+        if (batch_state->num_tuples >= batch_state->max_tuples)
+        {
+            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+            insert_batch(batch_state, cr->label_name, cr->graph_oid);
+            batch_state->num_tuples = 0;
+        }
     }
 
     for (i = 0; i < n_fields; ++i)
@@ -119,7 +146,6 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
         ereport(NOTICE,(errmsg("THere is some error")));
     }
 
-
     cr->cur_field = 0;
     cr->curr_row_length = 0;
     cr->row += 1;
@@ -152,8 +178,8 @@ static int is_term(unsigned char c)
 int create_edges_from_csv_file(char *file_path,
                                char *graph_name,
                                Oid graph_oid,
-                               char *object_name,
-                               int object_id,
+                               char *label_name,
+                               int label_id,
                                bool load_as_agtype)
 {
 
@@ -163,6 +189,7 @@ int create_edges_from_csv_file(char *file_path,
     size_t bytes_read;
     unsigned char options = 0;
     csv_edge_reader cr;
+    char *label_seq_name;
 
     if (csv_init(&p, options) != 0)
     {
@@ -180,6 +207,7 @@ int create_edges_from_csv_file(char *file_path,
                 (errmsg("Failed to open %s\n", file_path)));
     }
 
+    label_seq_name = get_label_seq_relation_name(label_name);
 
     memset((void*)&cr, 0, sizeof(csv_edge_reader));
     cr.alloc = 128;
@@ -189,10 +217,14 @@ int create_edges_from_csv_file(char *file_path,
     cr.curr_row_length = 0;
     cr.graph_name = graph_name;
     cr.graph_oid = graph_oid;
-    cr.object_name = object_name;
-    cr.object_id = object_id;
+    cr.label_name = label_name;
+    cr.label_id = label_id;
+    cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
     cr.load_as_agtype = load_as_agtype;
 
+    /* Initialize the batch insert state */
+    init_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
     while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
     {
         if (csv_parse(&p, buf, bytes_read, edge_field_cb,
@@ -205,6 +237,9 @@ int create_edges_from_csv_file(char *file_path,
 
     csv_fini(&p, edge_field_cb, edge_row_cb, &cr);
 
+    /* Finish any remaining batch inserts */
+    finish_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
     if (ferror(fp))
     {
         ereport(ERROR, (errmsg("Error while reading file %s\n", file_path)));
@@ -216,3 +251,65 @@ int create_edges_from_csv_file(char *file_path,
     csv_free(&p);
     return EXIT_SUCCESS;
 }
+
+/*
+ * Initialize the batch insert state for edges.
+ */
+void init_edge_batch_insert(batch_insert_state **batch_state,
+                            char *label_name, Oid graph_oid)
+{
+    Relation relation;
+    int i;
+
+    // Open a temporary relation to get the tuple descriptor
+    relation = table_open(get_label_relation(label_name, graph_oid), 
AccessShareLock);
+
+    // Initialize the batch insert state
+    *batch_state = (batch_insert_state *) palloc0(sizeof(batch_insert_state));
+    (*batch_state)->max_tuples = BATCH_SIZE;
+    (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE);
+    (*batch_state)->num_tuples = 0;
+
+    // Create slots
+    for (i = 0; i < BATCH_SIZE; i++)
+    {
+        (*batch_state)->slots[i] = MakeSingleTupleTableSlot(
+                                            RelationGetDescr(relation),
+                                            &TTSOpsHeapTuple);
+    }
+
+    table_close(relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for edges. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+void finish_edge_batch_insert(batch_insert_state **batch_state,
+                              char *label_name, Oid graph_oid)
+{
+    int i;
+    Relation relation;
+
+    if ((*batch_state)->num_tuples > 0)
+    {
+        insert_batch(*batch_state, label_name, graph_oid);
+        (*batch_state)->num_tuples = 0;
+    }
+
+    // Open a temporary relation to ensure resources are properly cleaned up
+    relation = table_open(get_label_relation(label_name, graph_oid), 
AccessShareLock);
+
+    // Free slots
+    for (i = 0; i < BATCH_SIZE; i++)
+    {
+        ExecDropSingleTupleTableSlot((*batch_state)->slots[i]);
+    }
+
+    // Clean up batch state
+    pfree((*batch_state)->slots);
+    pfree(*batch_state);
+    *batch_state = NULL;
+
+    table_close(relation, AccessShareLock);
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/ag_load_labels.c 
b/src/backend/utils/load/ag_load_labels.c
index f377f1cb..b38c7fca 100644
--- a/src/backend/utils/load/ag_load_labels.c
+++ b/src/backend/utils/load/ag_load_labels.c
@@ -17,11 +17,26 @@
  * under the License.
  */
 #include "postgres.h"
+#include "executor/spi.h"
+#include "catalog/namespace.h"
+#include "executor/executor.h"
+#include "access/xact.h"
 
 #include "utils/load/ag_load_labels.h"
-#include "utils/load/age_load.h"
 #include "utils/load/csv.h"
 
+static void setup_temp_table_for_vertex_ids(char *graph_name);
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+                                       Oid graph_oid, Oid relid);
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+                                     char *label_name, Oid graph_oid,
+                                     Oid temp_table_relid);
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+                                       char *label_name, Oid graph_oid,
+                                       Oid temp_table_relid);
+static void insert_vertex_batch(batch_insert_state *batch_state, char 
*label_name,
+                                Oid graph_oid, Oid temp_table_relid);
+
 void vertex_field_cb(void *field, size_t field_len, void *data)
 {
 
@@ -55,16 +70,16 @@ void vertex_field_cb(void *field, size_t field_len, void 
*data)
 
 void vertex_row_cb(int delim __attribute__((unused)), void *data)
 {
-
     csv_vertex_reader *cr = (csv_vertex_reader*)data;
-    agtype *props = NULL;
+    batch_insert_state *batch_state = cr->batch_state;
     size_t i, n_fields;
-    graphid object_graph_id;
-    int64 label_id_int;
+    graphid vertex_id;
+    int64 entry_id;
+    TupleTableSlot *slot;
+    TupleTableSlot *temp_id_slot;
 
     n_fields = cr->cur_field;
 
-
     if (cr->row == 0)
     {
         cr->header_num = cr->cur_field;
@@ -82,36 +97,67 @@ void vertex_row_cb(int delim __attribute__((unused)), void 
*data)
     {
         if (cr->id_field_exists)
         {
-            label_id_int = strtol(cr->fields[0], NULL, 10);
+            entry_id = strtol(cr->fields[0], NULL, 10);
+            if (entry_id > cr->curr_seq_num)
+            {
+                DirectFunctionCall2(setval_oid,
+                                    ObjectIdGetDatum(cr->label_seq_relid),
+                                    Int64GetDatum(entry_id));
+                cr->curr_seq_num = entry_id;
+            }
         }
         else
         {
-            label_id_int = (int64)cr->row;
+            entry_id = nextval_internal(cr->label_seq_relid, true);
         }
 
-        object_graph_id = make_graphid(cr->object_id, label_id_int);
+        vertex_id = make_graphid(cr->label_id, entry_id);
 
-        props = create_agtype_from_list(cr->header, cr->fields,
-                                        n_fields, label_id_int,
-                                        cr->load_as_agtype);
-        insert_vertex_simple(cr->graph_oid, cr->object_name,
-                             object_graph_id, props);
-        pfree(props);
-    }
+        /* Get the appropriate slot from the batch state */
+        slot = batch_state->slots[batch_state->num_tuples];
+        temp_id_slot = batch_state->temp_id_slots[batch_state->num_tuples];
+
+        /* Clear the slots contents */
+        ExecClearTuple(slot);
+        ExecClearTuple(temp_id_slot);
 
+        /* Fill the values in the slot */
+        slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
+        slot->tts_values[1] = AGTYPE_P_GET_DATUM(
+                                create_agtype_from_list(cr->header, cr->fields,
+                                                        n_fields, entry_id,
+                                                        cr->load_as_agtype));
+        slot->tts_isnull[0] = false;
+        slot->tts_isnull[1] = false;
+
+        temp_id_slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
+        temp_id_slot->tts_isnull[0] = false;
+
+        /* Make the slot as containing virtual tuple */
+        ExecStoreVirtualTuple(slot);
+        ExecStoreVirtualTuple(temp_id_slot);
+
+        batch_state->num_tuples++;
+
+        if (batch_state->num_tuples >= batch_state->max_tuples)
+        {
+            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+            insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid,
+                                cr->temp_table_relid);
+            batch_state->num_tuples = 0;
+        }
+    }
 
     for (i = 0; i < n_fields; ++i)
     {
         free(cr->fields[i]);
     }
 
-
     if (cr->error)
     {
         ereport(NOTICE,(errmsg("THere is some error")));
     }
 
-
     cr->cur_field = 0;
     cr->curr_row_length = 0;
     cr->row += 1;
@@ -144,8 +190,8 @@ static int is_term(unsigned char c)
 int create_labels_from_csv_file(char *file_path,
                                 char *graph_name,
                                 Oid graph_oid,
-                                char *object_name,
-                                int object_id,
+                                char *label_name,
+                                int label_id,
                                 bool id_field_exists,
                                 bool load_as_agtype)
 {
@@ -156,6 +202,8 @@ int create_labels_from_csv_file(char *file_path,
     size_t bytes_read;
     unsigned char options = 0;
     csv_vertex_reader cr;
+    char *label_seq_name;
+    Oid temp_table_relid;
 
     if (csv_init(&p, options) != 0)
     {
@@ -163,6 +211,13 @@ int create_labels_from_csv_file(char *file_path,
                 (errmsg("Failed to initialize csv parser\n")));
     }
 
+    temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    if (!OidIsValid(temp_table_relid))
+    {
+        setup_temp_table_for_vertex_ids(graph_name);
+        temp_table_relid = 
RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    }
+
     csv_set_space_func(&p, is_space);
     csv_set_term_func(&p, is_term);
 
@@ -173,6 +228,7 @@ int create_labels_from_csv_file(char *file_path,
                 (errmsg("Failed to open %s\n", file_path)));
     }
 
+    label_seq_name = get_label_seq_relation_name(label_name);
 
     memset((void*)&cr, 0, sizeof(csv_vertex_reader));
 
@@ -183,12 +239,28 @@ int create_labels_from_csv_file(char *file_path,
     cr.curr_row_length = 0;
     cr.graph_name = graph_name;
     cr.graph_oid = graph_oid;
-    cr.object_name = object_name;
-    cr.object_id = object_id;
+    cr.label_name = label_name;
+    cr.label_id = label_id;
     cr.id_field_exists = id_field_exists;
+    cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
     cr.load_as_agtype = load_as_agtype;
+    cr.temp_table_relid = temp_table_relid;
+    
+    if (cr.id_field_exists)
+    {
+        /*
+         * Set the curr_seq_num since we will need it to compare with
+         * incoming entry_id.
+         * 
+         * We cant use currval because it will error out if nextval was
+         * not called before in the session.
+         */
+        cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
+    }
 
-
+    /* Initialize the batch insert state */
+    init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+                             cr.temp_table_relid);
 
     while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
     {
@@ -202,6 +274,10 @@ int create_labels_from_csv_file(char *file_path,
 
     csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
 
+    /* Finish any remaining batch inserts */
+    finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+                               cr.temp_table_relid);
+
     if (ferror(fp))
     {
         ereport(ERROR, (errmsg("Error while reading file %s\n",
@@ -214,3 +290,179 @@ int create_labels_from_csv_file(char *file_path,
     csv_free(&p);
     return EXIT_SUCCESS;
 }
+
+static void insert_vertex_batch(batch_insert_state *batch_state, char 
*label_name,
+                                Oid graph_oid, Oid temp_table_relid)
+{
+    insert_batch_in_temp_table(batch_state, graph_oid, temp_table_relid);
+    insert_batch(batch_state, label_name, graph_oid);
+}
+
+/*
+ * Create and populate a temporary table with vertex ids that are already
+ * present in the graph. This table will be used to check if the new vertex
+ * id generated by loader is a duplicate.
+ * Unique index is created to enforce uniqueness of the ids.
+ * 
+ * We dont need this for loading edges since the ids are generated using
+ * sequence and are unique.
+ */ 
+static void setup_temp_table_for_vertex_ids(char *graph_name)
+{
+    char *create_as_query;
+    char *index_query;
+
+    create_as_query = psprintf("CREATE TEMP TABLE IF NOT EXISTS %s AS "
+                               "SELECT DISTINCT id FROM \"%s\".%s",
+                                GET_TEMP_VERTEX_ID_TABLE(graph_name), 
graph_name,
+                                AG_DEFAULT_LABEL_VERTEX);
+
+    index_query = psprintf("CREATE UNIQUE INDEX ON %s (id)",
+                           GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    SPI_connect();
+    SPI_execute(create_as_query, false, 0);
+    SPI_execute(index_query, false, 0);
+
+    SPI_finish();
+}
+
+/*
+ * Inserts batch of tuples into the temporary table.
+ * This function also updates the index to check for
+ * uniqueness of the ids.
+ */
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+                                       Oid graph_oid, Oid relid)
+{
+    int i;
+    EState *estate;
+    ResultRelInfo *resultRelInfo;
+    Relation rel;
+    List *result;
+
+    rel = table_open(relid, RowExclusiveLock);
+
+    /* Initialize executor state */
+    estate = CreateExecutorState();
+
+    /* Initialize result relation information */
+    resultRelInfo = makeNode(ResultRelInfo);
+    InitResultRelInfo(resultRelInfo, rel, 1, NULL, estate->es_instrument);
+    estate->es_result_relation_info = resultRelInfo;
+
+    /* Open the indices */
+    ExecOpenIndices(resultRelInfo, false);
+
+    /* Insert the batch into the temporary table */
+    heap_multi_insert(rel, batch_state->temp_id_slots, batch_state->num_tuples,
+                      GetCurrentCommandId(true), 0, NULL);
+
+    for (i = 0; i < batch_state->num_tuples; i++)
+    {
+        result = ExecInsertIndexTuples(batch_state->temp_id_slots[i],
+                                       estate, true, NULL, NIL);
+        /* Check if the unique cnstraint is violated */
+        if (list_length(result) != 0)
+        {
+            Datum id;
+            bool isnull;
+
+            id = slot_getattr(batch_state->temp_id_slots[i], 1, &isnull);
+            ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld",
+                                    DATUM_GET_GRAPHID(id)),
+                            errhint("Entry id %ld is already used",
+                                    get_graphid_entry_id(id))));
+        }
+    }
+    /* Clean up and close the indices */
+    ExecCloseIndices(resultRelInfo);
+
+    FreeExecutorState(estate);
+    table_close(rel, RowExclusiveLock);
+
+    CommandCounterIncrement();
+}
+
+/*
+ * Initialize the batch insert state for vertices.
+ */
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+                                     char *label_name, Oid graph_oid,
+                                     Oid temp_table_relid)
+{
+    Relation relation;
+    Oid relid;
+
+    Relation temp_table_relation;
+    int i;
+
+    /* Open a temporary relation to get the tuple descriptor */
+    relid = get_label_relation(label_name, graph_oid);
+    relation = table_open(relid, AccessShareLock);
+
+    temp_table_relation = table_open(temp_table_relid, AccessShareLock);
+
+    /* Initialize the batch insert state */
+    *batch_state = (batch_insert_state *) palloc0(sizeof(batch_insert_state));
+    (*batch_state)->max_tuples = BATCH_SIZE;
+    (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE);
+    (*batch_state)->temp_id_slots = palloc(sizeof(TupleTableSlot *) * 
BATCH_SIZE);
+    (*batch_state)->num_tuples = 0;
+
+    /* Create slots */
+    for (i = 0; i < BATCH_SIZE; i++)
+    {
+        (*batch_state)->slots[i] = MakeSingleTupleTableSlot(
+                                            RelationGetDescr(relation),
+                                            &TTSOpsHeapTuple);
+        (*batch_state)->temp_id_slots[i] = MakeSingleTupleTableSlot(
+                                        RelationGetDescr(temp_table_relation),
+                                        &TTSOpsHeapTuple);
+    }
+
+    table_close(relation, AccessShareLock);
+    table_close(temp_table_relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for vertices. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+                                       char *label_name, Oid graph_oid,
+                                       Oid temp_table_relid)
+{
+    Relation relation;
+    Oid relid;
+
+    Relation temp_table_relation;
+    int i;
+
+    if ((*batch_state)->num_tuples > 0)
+    {
+        insert_vertex_batch(*batch_state, label_name, graph_oid, 
temp_table_relid);
+        (*batch_state)->num_tuples = 0;
+    }
+
+    /* Open a temporary relation to ensure resources are properly cleaned up */
+    relid = get_label_relation(label_name, graph_oid);
+    relation = table_open(relid, AccessShareLock);
+
+    temp_table_relation = table_open(temp_table_relid, AccessShareLock);
+
+    /* Free slots */
+    for (i = 0; i < BATCH_SIZE; i++)
+    {
+        ExecDropSingleTupleTableSlot((*batch_state)->slots[i]);
+        ExecDropSingleTupleTableSlot((*batch_state)->temp_id_slots[i]);
+    }
+
+    /* Clean up batch state */
+    pfree((*batch_state)->slots);
+    pfree((*batch_state)->temp_id_slots);
+    pfree(*batch_state);
+    *batch_state = NULL;
+
+    table_close(relation, AccessShareLock);
+    table_close(temp_table_relation, AccessShareLock);
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/age_load.c 
b/src/backend/utils/load/age_load.c
index 6f6f3d19..57c81210 100644
--- a/src/backend/utils/load/age_load.c
+++ b/src/backend/utils/load/age_load.c
@@ -27,6 +27,9 @@
 
 static agtype_value *csv_value_to_agtype_value(char *csv_val);
 static bool json_validate(text *json);
+static Oid get_or_create_graph(const Name graph_name);
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+                                 char *label_name, char label_kind);
 
 agtype *create_empty_agtype(void)
 {
@@ -305,6 +308,34 @@ void insert_vertex_simple(Oid graph_oid, char *label_name, 
graphid vertex_id,
     CommandCounterIncrement();
 }
 
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+                  Oid graph_oid)
+{
+    Relation label_relation;
+    BulkInsertState bistate;
+    Oid relid;
+
+    // Get the relation OID
+    relid = get_label_relation(label_name, graph_oid);
+
+    // Open the relation
+    label_relation = table_open(relid, RowExclusiveLock);
+
+    // Prepare the BulkInsertState
+    bistate = GetBulkInsertState();
+
+    // Perform the bulk insert
+    heap_multi_insert(label_relation, batch_state->slots,
+                      batch_state->num_tuples, GetCurrentCommandId(true),
+                      0, bistate);
+
+    // Clean up
+    FreeBulkInsertState(bistate);
+    table_close(label_relation, RowExclusiveLock);
+
+    CommandCounterIncrement();
+}
+
 PG_FUNCTION_INFO_V1(load_labels_from_file);
 Datum load_labels_from_file(PG_FUNCTION_ARGS)
 {
@@ -343,19 +374,24 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
     id_field_exists = PG_GETARG_BOOL(3);
     load_as_agtype = PG_GETARG_BOOL(4);
 
-
     graph_name_str = NameStr(*graph_name);
     label_name_str = NameStr(*label_name);
+
+    if (strcmp(label_name_str, "") == 0)
+    {
+        label_name_str = AG_DEFAULT_LABEL_VERTEX;
+    }
+
     file_path_str = text_to_cstring(file_path);
 
-    graph_oid = get_graph_oid(graph_name_str);
-    label_id = get_label_id(label_name_str, graph_oid);
+    graph_oid = get_or_create_graph(graph_name);
+    label_id = get_or_create_label(graph_oid, graph_name_str,
+                                   label_name_str, LABEL_KIND_VERTEX);
 
     create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                 label_name_str, label_id, id_field_exists,
                                 load_as_agtype);
     PG_RETURN_VOID();
-
 }
 
 PG_FUNCTION_INFO_V1(load_edges_from_file);
@@ -397,13 +433,91 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
 
     graph_name_str = NameStr(*graph_name);
     label_name_str = NameStr(*label_name);
+
+    if (strcmp(label_name_str, "") == 0)
+    {
+        label_name_str = AG_DEFAULT_LABEL_EDGE;
+    }
+
     file_path_str = text_to_cstring(file_path);
 
-    graph_oid = get_graph_oid(graph_name_str);
-    label_id = get_label_id(label_name_str, graph_oid);
+    graph_oid = get_or_create_graph(graph_name);
+    label_id = get_or_create_label(graph_oid, graph_name_str,
+                                   label_name_str, LABEL_KIND_EDGE);
 
     create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                label_name_str, label_id, load_as_agtype);
     PG_RETURN_VOID();
+}
+
+/*
+ * Helper function to create a graph if it does not exist.
+ * Just returns Oid of the graph if it already exists.
+ */
+static Oid get_or_create_graph(const Name graph_name)
+{
+    Oid graph_oid;
+    char *graph_name_str;
+
+    graph_name_str = NameStr(*graph_name);
+    graph_oid = get_graph_oid(graph_name_str);
 
+    if (OidIsValid(graph_oid))
+    {
+        return graph_oid;
+    }
+
+    graph_oid = create_graph_internal(graph_name);
+    ereport(NOTICE,
+            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+    
+    return graph_oid;
 }
+
+/*
+ * Helper function to create a label if it does not exist.
+ * Just returns label_id of the label if it already exists.
+ */
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+                                 char *label_name, char label_kind)
+{
+    int32 label_id;
+
+    label_id = get_label_id(label_name, graph_oid);
+
+    /* Check if label exists */
+    if (label_id_is_valid(label_id))
+    {
+        char *label_kind_full = (label_kind == LABEL_KIND_VERTEX)
+                                ? "vertex" : "edge";
+        char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX)
+                                    ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX;
+
+        /* If it exists, but as another label_kind, throw an error */
+        if (get_label_kind(label_name, graph_oid) == opposite_label_kind)
+        {
+            ereport(ERROR,
+                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("label \"%s\" already exists as %s label",
+                            label_name, label_kind_full)));
+        }
+    }
+    else
+    {
+        /* Create a label */
+        RangeVar *rv;
+        List *parent;
+        char *default_label = (label_kind == LABEL_KIND_VERTEX)
+                               ? AG_DEFAULT_LABEL_VERTEX : 
AG_DEFAULT_LABEL_EDGE;
+        rv = get_label_range_var(graph_name, graph_oid, default_label);
+        parent = list_make1(rv);
+
+        create_label(graph_name, label_name, label_kind, parent);
+        label_id = get_label_id(label_name, graph_oid);
+
+        ereport(NOTICE,
+                (errmsg("VLabel \"%s\" has been created", label_name)));
+    }
+
+    return label_id;
+}
\ No newline at end of file
diff --git a/src/include/catalog/ag_label.h b/src/include/catalog/ag_label.h
index 46ea9bc7..b75ed9e5 100644
--- a/src/include/catalog/ag_label.h
+++ b/src/include/catalog/ag_label.h
@@ -74,6 +74,8 @@ Oid get_label_relation(const char *label_name, Oid graph_oid);
 char *get_label_relation_name(const char *label_name, Oid graph_oid);
 Oid get_label_oid(const char *label_name, Oid label_graph);
 char get_label_kind(const char *label_name, Oid label_graph);
+char *get_label_seq_relation_name(const char *label_name);
+
 
 bool label_id_exists(Oid graph_oid, int32 label_id);
 RangeVar *get_label_range_var(char *graph_name, Oid graph_oid,
diff --git a/src/include/commands/graph_commands.h 
b/src/include/commands/graph_commands.h
index e4d93fc1..d456ef8c 100644
--- a/src/include/commands/graph_commands.h
+++ b/src/include/commands/graph_commands.h
@@ -21,5 +21,6 @@
 #define AG_GRAPH_COMMANDS_H
 
 Datum create_graph(PG_FUNCTION_ARGS);
+Oid create_graph_internal(const Name graph_name);
 
 #endif
diff --git a/src/include/utils/graphid.h b/src/include/utils/graphid.h
index bfb72ee8..407e9a58 100644
--- a/src/include/utils/graphid.h
+++ b/src/include/utils/graphid.h
@@ -36,7 +36,7 @@ typedef int64 graphid;
 
 #define label_id_is_valid(id) (id >= LABEL_ID_MIN && id <= LABEL_ID_MAX)
 
-#define ENTRY_ID_MIN INT64CONST(1)
+#define ENTRY_ID_MIN INT64CONST(0)
 /* 0x0000ffffffffffff */
 #define ENTRY_ID_MAX INT64CONST(281474976710655)
 #define INVALID_ENTRY_ID INT64CONST(0)
diff --git a/src/include/utils/load/ag_load_edges.h 
b/src/include/utils/load/ag_load_edges.h
index 6bb8ac27..df663b1d 100644
--- a/src/include/utils/load/ag_load_edges.h
+++ b/src/include/utils/load/ag_load_edges.h
@@ -17,6 +17,9 @@
  * under the License.
  */
 
+#include "access/heapam.h"
+#include "utils/load/age_load.h"
+
 #ifndef AG_LOAD_EDGES_H
 #define AG_LOAD_EDGES_H
 
@@ -34,12 +37,13 @@ typedef struct {
     size_t curr_row_length;
     char *graph_name;
     Oid graph_oid;
-    char *object_name;
-    int object_id;
+    char *label_name;
+    int label_id;
+    Oid label_seq_relid;
     char *start_vertex;
     char *end_vertex;
     bool load_as_agtype;
-
+    batch_insert_state *batch_state;
 } csv_edge_reader;
 
 
@@ -47,7 +51,7 @@ void edge_field_cb(void *field, size_t field_len, void *data);
 void edge_row_cb(int delim __attribute__((unused)), void *data);
 
 int create_edges_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
-                                char *object_name, int object_id,
+                                char *label_name, int label_id,
                                 bool load_as_agtype);
 
 #endif /*AG_LOAD_EDGES_H */
diff --git a/src/include/utils/load/ag_load_labels.h 
b/src/include/utils/load/ag_load_labels.h
index 7d272efb..3a70a5c0 100644
--- a/src/include/utils/load/ag_load_labels.h
+++ b/src/include/utils/load/ag_load_labels.h
@@ -22,6 +22,7 @@
 #define AG_LOAD_LABELS_H
 
 #include "access/heapam.h"
+#include "utils/load/age_load.h"
 
 #define AGE_VERTIX 1
 #define AGE_EDGE 2
@@ -47,10 +48,14 @@ typedef struct {
     size_t curr_row_length;
     char *graph_name;
     Oid graph_oid;
-    char *object_name;
-    int object_id;
+    char *label_name;
+    int label_id;
+    Oid label_seq_relid;
+    Oid temp_table_relid;
     bool id_field_exists;
     bool load_as_agtype;
+    int curr_seq_num;
+    batch_insert_state *batch_state;
 } csv_vertex_reader;
 
 
@@ -58,7 +63,7 @@ void vertex_field_cb(void *field, size_t field_len, void 
*data);
 void vertex_row_cb(int delim __attribute__((unused)), void *data);
 
 int create_labels_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
-                                char *object_name, int object_id,
+                                char *label_name, int label_id,
                                 bool id_field_exists, bool load_as_agtype);
 
 #endif /* AG_LOAD_LABELS_H */
diff --git a/src/include/utils/load/age_load.h 
b/src/include/utils/load/age_load.h
index 1c650bb8..b1335581 100644
--- a/src/include/utils/load/age_load.h
+++ b/src/include/utils/load/age_load.h
@@ -24,11 +24,26 @@
 #include "catalog/ag_graph.h"
 #include "catalog/ag_label.h"
 #include "commands/label_commands.h"
+#include "commands/graph_commands.h"
 #include "utils/ag_cache.h"
 
 #ifndef AGE_ENTITY_CREATOR_H
 #define AGE_ENTITY_CREATOR_H
 
+#define TEMP_VERTEX_ID_TABLE_SUFFIX "_ag_vertex_ids"
+#define GET_TEMP_VERTEX_ID_TABLE(graph_name) \
+    psprintf("_%s%s", graph_name, TEMP_VERTEX_ID_TABLE_SUFFIX)
+
+#define BATCH_SIZE 1000
+
+typedef struct
+{
+    TupleTableSlot **slots;
+    TupleTableSlot **temp_id_slots;
+    int num_tuples;
+    int max_tuples;
+} batch_insert_state;
+
 agtype* create_empty_agtype(void);
 
 agtype* create_agtype_from_list(char **header, char **fields,
@@ -42,5 +57,7 @@ void insert_vertex_simple(Oid graph_oid, char *label_name, 
graphid vertex_id,
 void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id,
                         graphid start_id, graphid end_id,
                         agtype* end_properties);
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+                  Oid graph_oid);
 
 #endif /* AGE_ENTITY_CREATOR_H */

Reply via email to