This is an automated email from the ASF dual-hosted git repository.

jgemignani pushed a commit to branch PG11
in repository https://gitbox.apache.org/repos/asf/age.git


The following commit(s) were added to refs/heads/PG11 by this push:
     new 7ee9156f Revamp age csv loader (#2044) (#2068)
7ee9156f is described below

commit 7ee9156f2b00d9932d5a9db5274673a7121604a0
Author: Muhammad Taha Naveed <[email protected]>
AuthorDate: Thu Aug 22 19:19:23 2024 +0500

    Revamp age csv loader (#2044) (#2068)
    
    * Allow 0 as entry_id
    
    - No regression test were impacted by this change.
    
    * Use batch inserts to improve performance
    
    - Changed heap_insert to heap_multi_insert since it is faster than
      calling heap_insert() in a loop. When multiple tuples can be inserted
      on a single page, just a single WAL record covering all of them, and
      only need to lock/unlock the page once.
    
    - BATCH_SIZE is set to 1000, which is the number of tuples to insert in
      a single batch. This number was chosen after some experimentation.
    
    - Change some of the field names to avoid confusion.
    
    * Use sequence for generating ids for edge and vertex
    
    - Sequence is not used if the id_field_exists is true in
      load_labels_from_file function, since the entry id is present in the
      csv.
    
    * Add function to create temporary table for ids
    
    - Created a temporary table and populate it with already generated
      vertex ids. A unique index is created on id column to ensure that
      new ids generated (using entry id from csv) are unique.
    
    * Insert generated ids in the temporary table to enforce uniqueness
    
    - Insert ids in the temporary table and also update index to
      enforce uniqueness.
    - If the entry id provided in the CSV is greater than the current
      sequence value, the sequence value is updated to match the entry ID.
      For example:
      Suppose the current sequence value is 1, and the CSV entry ID is 2.
      If we use 2 but not update the sequence to 2, next time the CREATE
      clause is used, 2 will be returned by sequence as an entry id,
      resulting in duplicate.
    - Update batch functions
    
    * Add functions to create graph and label automatically
    
    - These functions will check existence of graph and label, and create
      them if they don't exist.
    
    * Add regression tests
---
 regress/expected/age_load.out           | 139 +++++++++++-----
 regress/sql/age_load.sql                |  74 ++++++++-
 src/backend/catalog/ag_graph.c          |   4 +-
 src/backend/catalog/ag_label.c          |   5 +
 src/backend/commands/graph_commands.c   |  34 ++--
 src/backend/utils/load/ag_load_edges.c  | 120 ++++++++++----
 src/backend/utils/load/ag_load_labels.c | 283 +++++++++++++++++++++++++++++---
 src/backend/utils/load/age_load.c       | 137 ++++++++++++++--
 src/include/catalog/ag_graph.h          |   1 +
 src/include/catalog/ag_label.h          |   2 +
 src/include/commands/graph_commands.h   |   1 +
 src/include/utils/graphid.h             |   5 +-
 src/include/utils/load/ag_load_edges.h  |  16 +-
 src/include/utils/load/ag_load_labels.h |  15 +-
 src/include/utils/load/age_load.h       |  19 +++
 15 files changed, 719 insertions(+), 136 deletions(-)

diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out
index 8635a499..8f216341 100644
--- a/regress/expected/age_load.out
+++ b/regress/expected/age_load.out
@@ -19,6 +19,7 @@
 \! cp -r regress/age_load/data regress/instance/data/age_load
 LOAD 'age';
 SET search_path TO ag_catalog;
+-- Create a country using CREATE clause
 SELECT create_graph('agload_test_graph');
 NOTICE:  graph "agload_test_graph" has been created
  create_graph 
@@ -26,34 +27,79 @@ NOTICE:  graph "agload_test_graph" has been created
  
 (1 row)
 
-SELECT create_vlabel('agload_test_graph','Country');
-NOTICE:  VLabel "Country" has been created
- create_vlabel 
----------------
- 
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) 
RETURN n$$) as (n agtype);
+                                        n                                      
   
+----------------------------------------------------------------------------------
+ {"id": 844424930131969, "label": "Country", "properties": {"__id__": 
1}}::vertex
 (1 row)
 
+--
+-- Load countries with id
+--
 SELECT load_labels_from_file('agload_test_graph', 'Country',
-    'age_load/countries.csv');
+    'age_load/countries.csv', true);
  load_labels_from_file 
 -----------------------
  
 (1 row)
 
-SELECT create_vlabel('agload_test_graph','City');
-NOTICE:  VLabel "City" has been created
- create_vlabel 
----------------
- 
+-- A temporary table should have been created with 54 ids; 1 from CREATE and 
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+ ?column? 
+----------
+ t
 (1 row)
 
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', true);
+ERROR:  Cannot insert duplicate vertex id: 844424930131970
+HINT:  Entry id 2 is already used
+--
+-- Load cities with id
+--
+-- Should create City label automatically and load cities
 SELECT load_labels_from_file('agload_test_graph', 'City',
-    'age_load/cities.csv');
+    'age_load/cities.csv', true);
+NOTICE:  VLabel "City" has been created
  load_labels_from_file 
 -----------------------
  
 (1 row)
 
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+    'age_load/cities.csv', true);
+ERROR:  Cannot insert duplicate vertex id: 1125899906842777
+HINT:  Entry id 153 is already used
+--
+-- Load edges -- Connects cities to countries
+--
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+    'age_load/edges.csv');
+ERROR:  label "Country" already exists as edge label
 SELECT create_elabel('agload_test_graph','has_city');
 NOTICE:  ELabel "has_city" has been created
  create_elabel 
@@ -68,6 +114,17 @@ SELECT load_edges_from_file('agload_test_graph', 'has_city',
  
 (1 row)
 
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+     'age_load/cities.csv');
+ERROR:  label "has_city" already exists as vertex label
 SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
 FROM information_schema.tables
 WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -83,7 +140,7 @@ WHERE table_schema = 'agload_test_graph' ORDER BY table_name 
ASC;
 SELECT COUNT(*) FROM agload_test_graph."Country";
  count 
 -------
-    53
+    54
 (1 row)
 
 SELECT COUNT(*) FROM agload_test_graph."City";
@@ -101,7 +158,7 @@ SELECT COUNT(*) FROM agload_test_graph."has_city";
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n 
agtype);
  count 
 -------
- 72538
+ 72539
 (1 row)
 
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN 
e$$) as (n agtype);
@@ -110,6 +167,17 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH 
(a)-[e]->(b) RETURN e$$
  72485
 (1 row)
 
+--
+-- Load countries and cities without id
+--
+-- Should load countries in Country label without error since it should use 
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', false);
+ load_labels_from_file 
+-----------------------
+ 
+(1 row)
+
 SELECT create_vlabel('agload_test_graph','Country2');
 NOTICE:  VLabel "Country2" has been created
  create_vlabel 
@@ -153,6 +221,7 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
 SELECT id FROM agload_test_graph."Country" LIMIT 10;
        id        
 -----------------
+ 844424930131969
  844424930131970
  844424930131971
  844424930131974
@@ -162,7 +231,6 @@ SELECT id FROM agload_test_graph."Country" LIMIT 10;
  844424930131996
  844424930132002
  844424930132023
- 844424930132025
 (10 rows)
 
 SELECT id FROM agload_test_graph."Country2" LIMIT 10;
@@ -180,13 +248,16 @@ SELECT id FROM agload_test_graph."Country2" LIMIT 10;
  1688849860263946
 (10 rows)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)      |  n.name   | n.iso2 
 -----------------+-----------+--------
  844424930131990 | "Belgium" | "BE"
-(1 row)
+ 844424930132223 | "Belgium" | "BE"
+(2 rows)
 
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)       |  n.name   | n.iso2 
@@ -194,13 +265,16 @@ SELECT * FROM cypher('agload_test_graph', 
$$MATCH(n:Country2 {iso2 : 'BE'})
  1688849860263942 | "Belgium" | "BE"
 (1 row)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)      |  n.name   | n.iso2 
 -----------------+-----------+--------
  844424930131983 | "Austria" | "AT"
-(1 row)
+ 844424930132221 | "Austria" | "AT"
+(2 rows)
 
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
       id(n)       |  n.name   | n.iso2 
@@ -208,15 +282,17 @@ SELECT * FROM cypher('agload_test_graph', 
$$MATCH(n:Country2 {iso2 : 'AT'})
  1688849860263940 | "Austria" | "AT"
 (1 row)
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$
     MATCH (u:Country {region : "Europe"})
     WHERE u.name =~ 'Cro.*'
-    RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
- result_1  | result_2 
------------+----------
- "Croatia" | "Europe"
-(1 row)
+    RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
+      id(u)      | result_1  | result_2 
+-----------------+-----------+----------
+ 844424930132023 | "Croatia" | "Europe"
+ 844424930132226 | "Croatia" | "Europe"
+(2 rows)
 
 SELECT drop_graph('agload_test_graph', true);
 NOTICE:  drop cascades to 7 other objects
@@ -236,22 +312,11 @@ NOTICE:  graph "agload_test_graph" has been dropped
 --
 -- Test property type conversion
 --
-SELECT create_graph('agload_conversion');
-NOTICE:  graph "agload_conversion" has been created
- create_graph 
---------------
- 
-(1 row)
-
 -- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
-NOTICE:  VLabel "Person1" has been created
- create_vlabel 
----------------
- 
-(1 row)
-
+-- Should create graph and label automatically
 SELECT load_labels_from_file('agload_conversion', 'Person1', 
'age_load/conversion_vertices.csv', true, true);
+NOTICE:  graph "agload_conversion" has been created
+NOTICE:  VLabel "Person1" has been created
  load_labels_from_file 
 -----------------------
  
diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql
index cee34f59..0d76654f 100644
--- a/regress/sql/age_load.sql
+++ b/regress/sql/age_load.sql
@@ -22,20 +22,65 @@
 LOAD 'age';
 
 SET search_path TO ag_catalog;
+
+-- Create a country using CREATE clause
 SELECT create_graph('agload_test_graph');
 
-SELECT create_vlabel('agload_test_graph','Country');
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) 
RETURN n$$) as (n agtype);
+
+--
+-- Load countries with id
+--
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', true);
+
+-- A temporary table should have been created with 54 ids; 1 from CREATE and 
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+
+-- Should error out on loading the same file again due to duplicate id
 SELECT load_labels_from_file('agload_test_graph', 'Country',
-    'age_load/countries.csv');
+    'age_load/countries.csv', true);
+
+--
+-- Load cities with id
+--
 
-SELECT create_vlabel('agload_test_graph','City');
+-- Should create City label automatically and load cities
 SELECT load_labels_from_file('agload_test_graph', 'City',
-    'age_load/cities.csv');
+    'age_load/cities.csv', true);
+
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+    'age_load/cities.csv', true);
+
+--
+-- Load edges -- Connects cities to countries
+--
+
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+    'age_load/edges.csv');
 
 SELECT create_elabel('agload_test_graph','has_city');
 SELECT load_edges_from_file('agload_test_graph', 'has_city',
      'age_load/edges.csv');
 
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+     'age_load/cities.csv');
+
 SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
 FROM information_schema.tables
 WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -48,6 +93,14 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) 
RETURN n$$) as (n ag
 
 SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN 
e$$) as (n agtype);
 
+--
+-- Load countries and cities without id
+--
+
+-- Should load countries in Country label without error since it should use 
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+    'age_load/countries.csv', false);
+
 SELECT create_vlabel('agload_test_graph','Country2');
 SELECT load_labels_from_file('agload_test_graph', 'Country2',
                              'age_load/countries.csv', false);
@@ -62,31 +115,36 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
 SELECT id FROM agload_test_graph."Country" LIMIT 10;
 SELECT id FROM agload_test_graph."Country2" LIMIT 10;
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
+-- Should return 1 row
 SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
     RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, 
"n.iso2" agtype);
 
+-- Should return 2 rows for Country with same properties, but different ids
 SELECT * FROM cypher('agload_test_graph', $$
     MATCH (u:Country {region : "Europe"})
     WHERE u.name =~ 'Cro.*'
-    RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
+    RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
 
 SELECT drop_graph('agload_test_graph', true);
 
 --
 -- Test property type conversion
 --
-SELECT create_graph('agload_conversion');
 
 -- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
+
+-- Should create graph and label automatically
 SELECT load_labels_from_file('agload_conversion', 'Person1', 
'age_load/conversion_vertices.csv', true, true);
 SELECT * FROM cypher('agload_conversion', $$ MATCH (n:Person1) RETURN 
properties(n) $$) as (a agtype);
 
diff --git a/src/backend/catalog/ag_graph.c b/src/backend/catalog/ag_graph.c
index c1e53d6a..7000d80b 100644
--- a/src/backend/catalog/ag_graph.c
+++ b/src/backend/catalog/ag_graph.c
@@ -36,8 +36,6 @@
 #include "catalog/ag_graph.h"
 #include "utils/ag_cache.h"
 
-static Oid get_graph_namespace(const char *graph_name);
-
 // INSERT INTO ag_catalog.ag_graph VALUES (graph_name, nsp_id)
 Oid insert_graph(const Name graph_name, const Oid nsp_id)
 {
@@ -160,7 +158,7 @@ Oid get_graph_oid(const char *graph_name)
         return InvalidOid;
 }
 
-static Oid get_graph_namespace(const char *graph_name)
+Oid get_graph_namespace(const char *graph_name)
 {
     graph_cache_data *cache_data;
 
diff --git a/src/backend/catalog/ag_label.c b/src/backend/catalog/ag_label.c
index 41a11abe..e60a1d56 100644
--- a/src/backend/catalog/ag_label.c
+++ b/src/backend/catalog/ag_label.c
@@ -185,6 +185,11 @@ char get_label_kind(const char *label_name, Oid 
label_graph)
     }
 }
 
+char *get_label_seq_relation_name(const char *label_name)
+{
+    return psprintf("%s_id_seq", label_name);
+}
+
 PG_FUNCTION_INFO_V1(_label_name);
 
 /*
diff --git a/src/backend/commands/graph_commands.c 
b/src/backend/commands/graph_commands.c
index 4216652b..e912518a 100644
--- a/src/backend/commands/graph_commands.c
+++ b/src/backend/commands/graph_commands.c
@@ -43,6 +43,7 @@
 #include "catalog/ag_label.h"
 #include "commands/label_commands.h"
 #include "utils/graphid.h"
+#include "commands/graph_commands.h"
 #include "utils/name_validation.h"
 
 /*
@@ -61,10 +62,7 @@ PG_FUNCTION_INFO_V1(create_graph);
 /* function that is evoked for creating a graph */
 Datum create_graph(PG_FUNCTION_ARGS)
 {
-    char *graph;
     Name graph_name;
-    char *graph_name_str;
-    Oid nsp_id;
 
     //if no argument is passed with the function, graph name cannot be null
     if (PG_ARGISNULL(0))
@@ -76,6 +74,23 @@ Datum create_graph(PG_FUNCTION_ARGS)
     //gets graph name as function argument
     graph_name = PG_GETARG_NAME(0);  
 
+    create_graph_internal(graph_name);
+
+    ereport(NOTICE,
+            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+
+    /* 
+     * According to postgres specification of c-language functions
+     * if function returns void this is the syntax.
+     */
+    PG_RETURN_VOID(); 
+}
+
+Oid create_graph_internal(const Name graph_name)
+{
+    Oid nsp_id;
+    char *graph_name_str;
+
     graph_name_str = NameStr(*graph_name);
 
     //checking if the name of the graph falls under the pre-decided graph 
naming conventions(regex)
@@ -101,16 +116,11 @@ Datum create_graph(PG_FUNCTION_ARGS)
     //Increment the Command counter before create the generic labels.
     CommandCounterIncrement();
 
-    //Create the default label tables
-    graph = graph_name->data;
-    create_label(graph, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL);
-    create_label(graph, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
+    /* Create the default label tables */
+    create_label(graph_name_str, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, 
NIL);
+    create_label(graph_name_str, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
 
-    ereport(NOTICE,
-            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
-
-    //according to postgres specification of c-language functions if function 
returns void this is the syntax
-    PG_RETURN_VOID(); 
+    return nsp_id;
 }
 
 static Oid create_schema_for_graph(const Name graph_name)
diff --git a/src/backend/utils/load/ag_load_edges.c 
b/src/backend/utils/load/ag_load_edges.c
index d6ae29ff..65ce9366 100644
--- a/src/backend/utils/load/ag_load_edges.c
+++ b/src/backend/utils/load/ag_load_edges.c
@@ -16,21 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "postgres.h"
 
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <stdlib.h>
-#include <unistd.h>
-
-#include "utils/load/csv.h"
 #include "utils/load/ag_load_edges.h"
-#include "utils/load/age_load.h"
+#include "utils/load/csv.h"
 
+static void init_edge_batch_insert(batch_insert_state **batch_state,
+                            char *label_name, Oid graph_oid);
+static void finish_edge_batch_insert(batch_insert_state **batch_state,
+                              char *label_name, Oid graph_oid);
 
 void edge_field_cb(void *field, size_t field_len, void *data)
 {
-
     csv_edge_reader *cr = (csv_edge_reader*)data;
     if (cr->error)
     {
@@ -61,8 +58,8 @@ void edge_field_cb(void *field, size_t field_len, void *data)
 // Parser calls this function when it detects end of a row
 void edge_row_cb(int delim __attribute__((unused)), void *data)
 {
-
     csv_edge_reader *cr = (csv_edge_reader*)data;
+    batch_insert_state *batch_state = cr->batch_state;
 
     size_t i, n_fields;
     int64 start_id_int;
@@ -73,9 +70,12 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
     graphid end_vertex_graph_id;
     int end_vertex_type_id;
 
-    graphid object_graph_id;
+    graphid edge_id;
+    int64 entry_id;
 
-    agtype* props = NULL;
+    Datum values[4];
+    bool isnull[4] = {false, false, false, false};
+    HeapTuple tuple;
 
     n_fields = cr->cur_field;
 
@@ -94,24 +94,38 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
     }
     else
     {
-        object_graph_id = make_graphid(cr->object_id, (int64)cr->row);
+        entry_id = nextval_internal(cr->label_seq_relid, true);
+        edge_id = make_graphid(cr->label_id, entry_id);
 
         start_id_int = strtol(cr->fields[0], NULL, 10);
-        start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_id);
+        start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
         end_id_int = strtol(cr->fields[2], NULL, 10);
-        end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_id);
+        end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);
 
         start_vertex_graph_id = make_graphid(start_vertex_type_id, 
start_id_int);
         end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
 
-        props = create_agtype_from_list_i(cr->header, cr->fields,
-                                          n_fields, 4, cr->load_as_agtype);
+        /* Fill the values */
+        values[0] = GRAPHID_GET_DATUM(edge_id);
+        values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
+        values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
+        values[3] = AGTYPE_P_GET_DATUM(
+                                create_agtype_from_list_i(
+                                    cr->header, cr->fields,
+                                    n_fields, 4, cr->load_as_agtype));
+
+        /* Create and insert the tuple into the batch state */
+        tuple = heap_form_tuple(batch_state->desc, values, isnull);
+        batch_state->buffered_tuples[batch_state->num_tuples] = tuple;
 
-        insert_edge_simple(cr->graph_id, cr->object_name,
-                           object_graph_id, start_vertex_graph_id,
-                           end_vertex_graph_id, props);
+        batch_state->num_tuples++;
 
-        pfree(props);
+        if (batch_state->num_tuples >= batch_state->max_tuples)
+        {
+            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+            insert_batch(batch_state, cr->label_name, cr->graph_oid);
+            batch_state->num_tuples = 0;
+        }
     }
 
     for (i = 0; i < n_fields; ++i)
@@ -124,7 +138,6 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
         ereport(NOTICE,(errmsg("THere is some error")));
     }
 
-
     cr->cur_field = 0;
     cr->curr_row_length = 0;
     cr->row += 1;
@@ -156,9 +169,9 @@ static int is_term(unsigned char c)
 
 int create_edges_from_csv_file(char *file_path,
                                char *graph_name,
-                               Oid graph_id,
-                               char *object_name,
-                               int object_id,
+                               Oid graph_oid,
+                               char *label_name,
+                               int label_id,
                                bool load_as_agtype)
 {
 
@@ -168,6 +181,8 @@ int create_edges_from_csv_file(char *file_path,
     size_t bytes_read;
     unsigned char options = 0;
     csv_edge_reader cr;
+    char *label_seq_name;
+    Oid nsp_id;
 
     if (csv_init(&p, options) != 0)
     {
@@ -185,6 +200,8 @@ int create_edges_from_csv_file(char *file_path,
                 (errmsg("Failed to open %s\n", file_path)));
     }
 
+    nsp_id = get_graph_namespace(graph_name);
+    label_seq_name = get_label_seq_relation_name(label_name);
 
     memset((void*)&cr, 0, sizeof(csv_edge_reader));
     cr.alloc = 128;
@@ -193,11 +210,15 @@ int create_edges_from_csv_file(char *file_path,
     cr.header_row_length = 0;
     cr.curr_row_length = 0;
     cr.graph_name = graph_name;
-    cr.graph_id = graph_id;
-    cr.object_name = object_name;
-    cr.object_id = object_id;
+    cr.graph_oid = graph_oid;
+    cr.label_name = label_name;
+    cr.label_id = label_id;
+    cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id);
     cr.load_as_agtype = load_as_agtype;
 
+    /* Initialize the batch insert state */
+    init_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
     while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
     {
         if (csv_parse(&p, buf, bytes_read, edge_field_cb,
@@ -210,6 +231,9 @@ int create_edges_from_csv_file(char *file_path,
 
     csv_fini(&p, edge_field_cb, edge_row_cb, &cr);
 
+    /* Finish any remaining batch inserts */
+    finish_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
     if (ferror(fp))
     {
         ereport(ERROR, (errmsg("Error while reading file %s\n", file_path)));
@@ -221,3 +245,43 @@ int create_edges_from_csv_file(char *file_path,
     csv_free(&p);
     return EXIT_SUCCESS;
 }
+
+/*
+ * Initialize the batch insert state for edges.
+ */
+static void init_edge_batch_insert(batch_insert_state **batch_state,
+                            char *label_name, Oid graph_oid)
+{
+    Relation relation;
+
+    // Open a temporary relation to get the tuple descriptor
+    relation = heap_open(get_label_relation(label_name, graph_oid), 
AccessShareLock);
+
+    // Initialize the batch insert state
+    *batch_state = palloc(sizeof(batch_insert_state));
+    (*batch_state)->max_tuples = BATCH_SIZE;
+    (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple));
+    (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation));
+    (*batch_state)->num_tuples = 0;
+
+    heap_close(relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for edges. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+static void finish_edge_batch_insert(batch_insert_state **batch_state,
+                              char *label_name, Oid graph_oid)
+{
+    if ((*batch_state)->num_tuples > 0)
+    {
+        insert_batch(*batch_state, label_name, graph_oid);
+        (*batch_state)->num_tuples = 0;
+    }
+
+    // Clean up batch state
+    pfree((*batch_state)->buffered_tuples);
+    pfree(*batch_state);
+    *batch_state = NULL;
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/ag_load_labels.c 
b/src/backend/utils/load/ag_load_labels.c
index 6f79071d..194a6dc8 100644
--- a/src/backend/utils/load/ag_load_labels.c
+++ b/src/backend/utils/load/ag_load_labels.c
@@ -17,11 +17,28 @@
  * under the License.
  */
 #include "postgres.h"
+#include "executor/spi.h"
+#include "catalog/namespace.h"
+#include "executor/executor.h"
+#include "access/xact.h"
+#include "executor/tuptable.h"
+#include "utils/rel.h"
 
 #include "utils/load/ag_load_labels.h"
-#include "utils/load/age_load.h"
 #include "utils/load/csv.h"
 
+static void setup_temp_table_for_vertex_ids(char *graph_name);
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+                                       Oid graph_oid, Oid relid);
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+                                     char *label_name, Oid graph_oid,
+                                     Oid temp_table_relid);
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+                                       char *label_name, Oid graph_oid,
+                                       Oid temp_table_relid);
+static void insert_vertex_batch(batch_insert_state *batch_state, char 
*label_name,
+                                Oid graph_oid, Oid temp_table_relid);
+
 void vertex_field_cb(void *field, size_t field_len, void *data)
 {
 
@@ -55,16 +72,20 @@ void vertex_field_cb(void *field, size_t field_len, void 
*data)
 
 void vertex_row_cb(int delim __attribute__((unused)), void *data)
 {
-
     csv_vertex_reader *cr = (csv_vertex_reader*)data;
-    agtype *props = NULL;
+    batch_insert_state *batch_state = cr->batch_state;
     size_t i, n_fields;
-    graphid object_graph_id;
-    int64 label_id_int;
+    graphid vertex_id;
+    int64 entry_id;
+    Datum values[2];
+    bool nulls[2] = {false, false};
+    Datum temp_table_values[1];
+    bool temp_table_nulls[1] = {false};
+    HeapTuple tuple;
+    HeapTuple temp_table_tuple;
 
     n_fields = cr->cur_field;
 
-
     if (cr->row == 0)
     {
         cr->header_num = cr->cur_field;
@@ -82,36 +103,61 @@ void vertex_row_cb(int delim __attribute__((unused)), void 
*data)
     {
         if (cr->id_field_exists)
         {
-            label_id_int = strtol(cr->fields[0], NULL, 10);
+            entry_id = strtol(cr->fields[0], NULL, 10);
+            if (entry_id > cr->curr_seq_num)
+            {
+                DirectFunctionCall2(setval_oid,
+                                    ObjectIdGetDatum(cr->label_seq_relid),
+                                    Int64GetDatum(entry_id));
+                cr->curr_seq_num = entry_id;
+            }
         }
         else
         {
-            label_id_int = (int64)cr->row;
+            entry_id = nextval_internal(cr->label_seq_relid, true);
         }
 
-        object_graph_id = make_graphid(cr->object_id, label_id_int);
+        vertex_id = make_graphid(cr->label_id, entry_id);
 
-        props = create_agtype_from_list(cr->header, cr->fields,
-                                        n_fields, label_id_int,
-                                        cr->load_as_agtype);
-        insert_vertex_simple(cr->graph_id, cr->object_name,
-                             object_graph_id, props);
-        pfree(props);
-    }
+        /* Fill the values */
+        values[0] = GRAPHID_GET_DATUM(vertex_id);
+        values[1] = AGTYPE_P_GET_DATUM(
+                                create_agtype_from_list(cr->header, cr->fields,
+                                                        n_fields, entry_id,
+                                                        cr->load_as_agtype));
+
+        temp_table_values[0] = GRAPHID_GET_DATUM(vertex_id);
+
+        /* Create the tuple */
+        tuple = heap_form_tuple(batch_state->desc, values, nulls);
+        temp_table_tuple = heap_form_tuple(batch_state->id_desc, 
temp_table_values,
+                                           temp_table_nulls);
+
+        /* Store the tuple in the batch state */
+        batch_state->buffered_tuples[batch_state->num_tuples] = tuple;
+        batch_state->buffered_id_tuples[batch_state->num_tuples] = 
temp_table_tuple;
+
+        batch_state->num_tuples++;
 
+        if (batch_state->num_tuples >= batch_state->max_tuples)
+        {
+            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+            insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid,
+                                cr->temp_table_relid);
+            batch_state->num_tuples = 0;
+        }
+    }
 
     for (i = 0; i < n_fields; ++i)
     {
         free(cr->fields[i]);
     }
 
-
     if (cr->error)
     {
         ereport(NOTICE,(errmsg("THere is some error")));
     }
 
-
     cr->cur_field = 0;
     cr->curr_row_length = 0;
     cr->row += 1;
@@ -143,9 +189,9 @@ static int is_term(unsigned char c)
 
 int create_labels_from_csv_file(char *file_path,
                                 char *graph_name,
-                                Oid graph_id,
-                                char *object_name,
-                                int object_id,
+                                Oid graph_oid,
+                                char *label_name,
+                                int label_id,
                                 bool id_field_exists,
                                 bool load_as_agtype)
 {
@@ -156,6 +202,9 @@ int create_labels_from_csv_file(char *file_path,
     size_t bytes_read;
     unsigned char options = 0;
     csv_vertex_reader cr;
+    char *label_seq_name;
+    Oid temp_table_relid;
+    Oid nsp_id;
 
     if (csv_init(&p, options) != 0)
     {
@@ -163,6 +212,13 @@ int create_labels_from_csv_file(char *file_path,
                 (errmsg("Failed to initialize csv parser\n")));
     }
 
+    temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    if (!OidIsValid(temp_table_relid))
+    {
+        setup_temp_table_for_vertex_ids(graph_name);
+        temp_table_relid = 
RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    }
+
     csv_set_space_func(&p, is_space);
     csv_set_term_func(&p, is_term);
 
@@ -173,6 +229,8 @@ int create_labels_from_csv_file(char *file_path,
                 (errmsg("Failed to open %s\n", file_path)));
     }
 
+    nsp_id = get_graph_namespace(graph_name);
+    label_seq_name = get_label_seq_relation_name(label_name);
 
     memset((void*)&cr, 0, sizeof(csv_vertex_reader));
 
@@ -182,13 +240,29 @@ int create_labels_from_csv_file(char *file_path,
     cr.header_row_length = 0;
     cr.curr_row_length = 0;
     cr.graph_name = graph_name;
-    cr.graph_id = graph_id;
-    cr.object_name = object_name;
-    cr.object_id = object_id;
+    cr.graph_oid = graph_oid;
+    cr.label_name = label_name;
+    cr.label_id = label_id;
     cr.id_field_exists = id_field_exists;
+    cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id);
     cr.load_as_agtype = load_as_agtype;
+    cr.temp_table_relid = temp_table_relid;
+    
+    if (cr.id_field_exists)
+    {
+        /*
+         * Set the curr_seq_num since we will need it to compare with
+         * incoming entry_id.
+         * 
+         * We cant use currval because it will error out if nextval was
+         * not called before in the session.
+         */
+        cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
+    }
 
-
+    /* Initialize the batch insert state */
+    init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+                             cr.temp_table_relid);
 
     while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
     {
@@ -202,6 +276,10 @@ int create_labels_from_csv_file(char *file_path,
 
     csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
 
+    /* Finish any remaining batch inserts */
+    finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+                               cr.temp_table_relid);
+
     if (ferror(fp))
     {
         ereport(ERROR, (errmsg("Error while reading file %s\n",
@@ -214,3 +292,158 @@ int create_labels_from_csv_file(char *file_path,
     csv_free(&p);
     return EXIT_SUCCESS;
 }
+
+static void insert_vertex_batch(batch_insert_state *batch_state, char 
*label_name,
+                                Oid graph_oid, Oid temp_table_relid)
+{
+    insert_batch_in_temp_table(batch_state, graph_oid, temp_table_relid);
+    insert_batch(batch_state, label_name, graph_oid);
+}
+
+/*
+ * Create and populate a temporary table with vertex ids that are already
+ * present in the graph. This table will be used to check if the new vertex
+ * id generated by loader is a duplicate.
+ * Unique index is created to enforce uniqueness of the ids.
+ * 
+ * We dont need this for loading edges since the ids are generated using
+ * sequence and are unique.
+ */ 
+static void setup_temp_table_for_vertex_ids(char *graph_name)
+{
+    char *create_as_query;
+    char *index_query;
+
+    create_as_query = psprintf("CREATE TEMP TABLE IF NOT EXISTS %s AS "
+                               "SELECT DISTINCT id FROM \"%s\".%s",
+                                GET_TEMP_VERTEX_ID_TABLE(graph_name), 
graph_name,
+                                AG_DEFAULT_LABEL_VERTEX);
+
+    index_query = psprintf("CREATE UNIQUE INDEX ON %s (id)",
+                            GET_TEMP_VERTEX_ID_TABLE(graph_name));
+    SPI_connect();
+    SPI_execute(create_as_query, false, 0);
+    SPI_execute(index_query, false, 0);
+
+    SPI_finish();
+
+    pfree(create_as_query);
+    pfree(index_query);
+}
+
+/*
+ * Inserts batch of tuples into the temporary table.
+ * This function also updates the index to check for
+ * uniqueness of the ids.
+ */
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+                                       Oid graph_oid, Oid relid)
+{
+    int i;
+    EState *estate;
+    ResultRelInfo *resultRelInfo;
+    Relation rel;
+    List *result;
+
+    rel = heap_open(relid, RowExclusiveLock);
+
+    /* Initialize executor state */
+    estate = CreateExecutorState();
+
+    /* Initialize result relation information */
+    resultRelInfo = makeNode(ResultRelInfo);
+    InitResultRelInfo(resultRelInfo, rel, 1, NULL, estate->es_instrument);
+    estate->es_result_relation_info = resultRelInfo;
+
+    /* Open the indices */
+    ExecOpenIndices(resultRelInfo, false);
+
+    /* Insert the batch into the temporary table */
+    heap_multi_insert(rel, batch_state->buffered_id_tuples,
+                      batch_state->num_tuples, GetCurrentCommandId(true),
+                      false, NULL);
+
+    for (i = 0; i < batch_state->num_tuples; i++)
+    {
+        TupleTableSlot *slot;
+        
+        slot = MakeSingleTupleTableSlot(batch_state->id_desc);
+        ExecStoreTuple(batch_state->buffered_id_tuples[i],
+                       slot, InvalidBuffer, false);
+        result = ExecInsertIndexTuples(slot, 
&(batch_state->buffered_id_tuples[i]->t_self),
+                                       estate, true, NULL, NIL);
+        /* Check if the unique cnstraint is violated */
+        if (list_length(result) != 0)
+        {
+            Datum id;
+            bool isnull;
+
+            id = slot_getattr(slot, 1, &isnull);
+            pfree(slot);
+            ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld",
+                                    DATUM_GET_GRAPHID(id)),
+                            errhint("Entry id %ld is already used",
+                                    get_graphid_entry_id(id))));
+        }
+
+        pfree(slot);
+    }
+    /* Clean up and close the indices */
+    ExecCloseIndices(resultRelInfo);
+
+    FreeExecutorState(estate);
+    heap_close(rel, RowExclusiveLock);
+
+    CommandCounterIncrement();
+}
+
+/*
+ * Initialize the batch insert state for vertices.
+ */
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+                                     char *label_name, Oid graph_oid,
+                                     Oid temp_table_relid)
+{
+    Relation relation;
+    Oid relid;
+    Relation temp_table_relation;
+
+    /* Open a temporary relation to get the tuple descriptor */
+    relid = get_label_relation(label_name, graph_oid);
+    relation = heap_open(relid, AccessShareLock);
+
+    temp_table_relation = heap_open(temp_table_relid, AccessShareLock);
+
+    /* Initialize the batch insert state */
+    *batch_state = palloc(sizeof(batch_insert_state));
+    (*batch_state)->max_tuples = BATCH_SIZE;
+    (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple));
+    (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation));
+    (*batch_state)->id_desc = 
CreateTupleDescCopy(RelationGetDescr(temp_table_relation));
+    (*batch_state)->buffered_id_tuples = palloc(BATCH_SIZE * 
sizeof(HeapTuple));
+    (*batch_state)->num_tuples = 0;
+
+    heap_close(relation, AccessShareLock);
+    heap_close(temp_table_relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for vertices. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+                                       char *label_name, Oid graph_oid,
+                                       Oid temp_table_relid)
+{
+    if ((*batch_state)->num_tuples > 0)
+    {
+        insert_vertex_batch(*batch_state, label_name, graph_oid, 
temp_table_relid);
+        (*batch_state)->num_tuples = 0;
+    }
+
+    /* Clean up batch state */
+    pfree((*batch_state)->buffered_tuples);
+    pfree((*batch_state)->buffered_id_tuples);
+    pfree(*batch_state);
+    *batch_state = NULL;
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/age_load.c 
b/src/backend/utils/load/age_load.c
index 5fa637b6..fd90b604 100644
--- a/src/backend/utils/load/age_load.c
+++ b/src/backend/utils/load/age_load.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 #include "utils/jsonapi.h"
+#include "nodes/makefuncs.h"
 
 #include "utils/load/age_load.h"
 #include "utils/load/ag_load_labels.h"
@@ -26,6 +27,9 @@
 
 static agtype_value *csv_value_to_agtype_value(char *csv_val);
 static bool json_validate(text *json);
+static Oid get_or_create_graph(const Name graph_name);
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+                                 char *label_name, char label_kind);
 
 agtype *create_empty_agtype(void)
 {
@@ -307,6 +311,33 @@ void insert_vertex_simple(Oid graph_id, char* label_name,
     CommandCounterIncrement();
 }
 
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+                  Oid graph_oid)
+{
+    Relation label_relation;
+    BulkInsertState bistate;
+    Oid relid;
+
+    // Get the relation OID
+    relid = get_label_relation(label_name, graph_oid);
+
+    // Open the relation
+    label_relation = heap_open(relid, RowExclusiveLock);
+
+    // Prepare the BulkInsertState
+    bistate = GetBulkInsertState();
+
+    // Perform the bulk insert
+    heap_multi_insert(label_relation, batch_state->buffered_tuples,
+                      batch_state->num_tuples, GetCurrentCommandId(true),
+                      false, bistate);
+
+    // Clean up
+    FreeBulkInsertState(bistate);
+    heap_close(label_relation, RowExclusiveLock);
+
+    CommandCounterIncrement();
+}
 
 PG_FUNCTION_INFO_V1(load_labels_from_file);
 Datum load_labels_from_file(PG_FUNCTION_ARGS)
@@ -318,7 +349,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
     char* graph_name_str;
     char* label_name_str;
     char* file_path_str;
-    Oid graph_id;
+    Oid graph_oid;
     int32 label_id;
     bool id_field_exists;
     bool load_as_agtype;
@@ -347,19 +378,24 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
     id_field_exists = PG_GETARG_BOOL(3);
     load_as_agtype = PG_GETARG_BOOL(4);
 
-
     graph_name_str = NameStr(*graph_name);
     label_name_str = NameStr(*label_name);
+
+    if (strcmp(label_name_str, "") == 0)
+    {
+        label_name_str = AG_DEFAULT_LABEL_VERTEX;
+    }
+
     file_path_str = text_to_cstring(file_path);
 
-    graph_id = get_graph_oid(graph_name_str);
-    label_id = get_label_id(label_name_str, graph_id);
+    graph_oid = get_or_create_graph(graph_name);
+    label_id = get_or_create_label(graph_oid, graph_name_str,
+                                   label_name_str, LABEL_KIND_VERTEX);
 
-    create_labels_from_csv_file(file_path_str, graph_name_str, graph_id,
+    create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                 label_name_str, label_id, id_field_exists,
                                 load_as_agtype);
     PG_RETURN_VOID();
-
 }
 
 PG_FUNCTION_INFO_V1(load_edges_from_file);
@@ -372,7 +408,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
     char* graph_name_str;
     char* label_name_str;
     char* file_path_str;
-    Oid graph_id;
+    Oid graph_oid;
     int32 label_id;
     bool load_as_agtype;
 
@@ -401,13 +437,94 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
 
     graph_name_str = NameStr(*graph_name);
     label_name_str = NameStr(*label_name);
+
+    if (strcmp(label_name_str, "") == 0)
+    {
+        label_name_str = AG_DEFAULT_LABEL_EDGE;
+    }
+
     file_path_str = text_to_cstring(file_path);
 
-    graph_id = get_graph_oid(graph_name_str);
-    label_id = get_label_id(label_name_str, graph_id);
+    graph_oid = get_or_create_graph(graph_name);
+    label_id = get_or_create_label(graph_oid, graph_name_str,
+                                   label_name_str, LABEL_KIND_EDGE);
 
-    create_edges_from_csv_file(file_path_str, graph_name_str, graph_id,
+    create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                label_name_str, label_id, load_as_agtype);
     PG_RETURN_VOID();
+}
+
+/*
+ * Helper function to create a graph if it does not exist.
+ * Just returns Oid of the graph if it already exists.
+ */
+static Oid get_or_create_graph(const Name graph_name)
+{
+    Oid graph_oid;
+    char *graph_name_str;
+
+    graph_name_str = NameStr(*graph_name);
+    graph_oid = get_graph_oid(graph_name_str);
+
+    if (OidIsValid(graph_oid))
+    {
+        return graph_oid;
+    }
+
+    create_graph_internal(graph_name);
 
+    graph_oid = get_graph_oid(graph_name_str);
+    
+    ereport(NOTICE,
+            (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+    
+    return graph_oid;
 }
+
+/*
+ * Helper function to create a label if it does not exist.
+ * Just returns label_id of the label if it already exists.
+ */
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+                                 char *label_name, char label_kind)
+{
+    int32 label_id;
+
+    label_id = get_label_id(label_name, graph_oid);
+
+    /* Check if label exists */
+    if (label_id_is_valid(label_id))
+    {
+        char *label_kind_full = (label_kind == LABEL_KIND_VERTEX)
+                                ? "vertex" : "edge";
+        char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX)
+                                    ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX;
+
+        /* If it exists, but as another label_kind, throw an error */
+        if (get_label_kind(label_name, graph_oid) == opposite_label_kind)
+        {
+            ereport(ERROR,
+                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("label \"%s\" already exists as %s label",
+                            label_name, label_kind_full)));
+        }
+    }
+    else
+    {
+        /* Create a label */
+        RangeVar *rv;
+        List *parent;
+        char *default_label = (label_kind == LABEL_KIND_VERTEX)
+                               ? AG_DEFAULT_LABEL_VERTEX : 
AG_DEFAULT_LABEL_EDGE;
+        rv = get_label_range_var(graph_name, graph_oid, default_label);
+        parent = list_make1(rv);
+
+        create_label(graph_name, label_name, label_kind, parent);
+        label_id = get_label_id(label_name, graph_oid);
+
+        ereport(NOTICE,
+                (errmsg("VLabel \"%s\" has been created", label_name)));
+    }
+
+    return label_id;
+}
\ No newline at end of file
diff --git a/src/include/catalog/ag_graph.h b/src/include/catalog/ag_graph.h
index b1b8f8e4..9b97aaf3 100644
--- a/src/include/catalog/ag_graph.h
+++ b/src/include/catalog/ag_graph.h
@@ -38,6 +38,7 @@ void update_graph_name(const Name graph_name, const Name 
new_name);
 
 Oid get_graph_oid(const char *graph_name);
 char *get_graph_namespace_name(const char *graph_name);
+Oid get_graph_namespace(const char *graph_name);
 
 List *get_graphnames(void);
 void drop_graphs(List *graphnames);
diff --git a/src/include/catalog/ag_label.h b/src/include/catalog/ag_label.h
index 7dfa2226..f742fb19 100644
--- a/src/include/catalog/ag_label.h
+++ b/src/include/catalog/ag_label.h
@@ -75,6 +75,8 @@ int32 get_label_id(const char *label_name, Oid label_graph);
 Oid get_label_relation(const char *label_name, Oid label_graph);
 char *get_label_relation_name(const char *label_name, Oid label_graph);
 char get_label_kind(const char *label_name, Oid label_graph);
+char *get_label_seq_relation_name(const char *label_name);
+
 
 bool label_id_exists(Oid label_graph, int32 label_id);
 RangeVar *get_label_range_var(char *graph_name, Oid graph_oid, char 
*label_name);
diff --git a/src/include/commands/graph_commands.h 
b/src/include/commands/graph_commands.h
index e4d93fc1..d456ef8c 100644
--- a/src/include/commands/graph_commands.h
+++ b/src/include/commands/graph_commands.h
@@ -21,5 +21,6 @@
 #define AG_GRAPH_COMMANDS_H
 
 Datum create_graph(PG_FUNCTION_ARGS);
+Oid create_graph_internal(const Name graph_name);
 
 #endif
diff --git a/src/include/utils/graphid.h b/src/include/utils/graphid.h
index 844ce652..fd9504da 100644
--- a/src/include/utils/graphid.h
+++ b/src/include/utils/graphid.h
@@ -35,8 +35,9 @@ typedef int64 graphid;
 
 #define label_id_is_valid(id) (id >= LABEL_ID_MIN && id <= LABEL_ID_MAX)
 
-#define ENTRY_ID_MIN INT64CONST(1)
-#define ENTRY_ID_MAX INT64CONST(281474976710655) // 0x0000ffffffffffff
+#define ENTRY_ID_MIN INT64CONST(0)
+/* 0x0000ffffffffffff */
+#define ENTRY_ID_MAX INT64CONST(281474976710655)
 #define INVALID_ENTRY_ID INT64CONST(0)
 
 #define entry_id_is_valid(id) (id >= ENTRY_ID_MIN && id <= ENTRY_ID_MAX)
diff --git a/src/include/utils/load/ag_load_edges.h 
b/src/include/utils/load/ag_load_edges.h
index 57940d45..03ab89e1 100644
--- a/src/include/utils/load/ag_load_edges.h
+++ b/src/include/utils/load/ag_load_edges.h
@@ -17,6 +17,9 @@
  * under the License.
  */
 
+#include "access/heapam.h"
+#include "utils/load/age_load.h"
+
 #ifndef AG_LOAD_EDGES_H
 #define AG_LOAD_EDGES_H
 
@@ -38,21 +41,22 @@ typedef struct {
     size_t header_row_length;
     size_t curr_row_length;
     char *graph_name;
-    Oid graph_id;
-    char *object_name;
-    int object_id;
+    Oid graph_oid;
+    char *label_name;
+    int label_id;
+    Oid label_seq_relid;
     char *start_vertex;
     char *end_vertex;
     bool load_as_agtype;
-
+    batch_insert_state *batch_state;
 } csv_edge_reader;
 
 
 void edge_field_cb(void *field, size_t field_len, void *data);
 void edge_row_cb(int delim __attribute__((unused)), void *data);
 
-int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_id,
-                                char *object_name, int object_id,
+int create_edges_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
+                                char *label_name, int label_id,
                                 bool load_as_agtype);
 
 #endif //AG_LOAD_EDGES_H
diff --git a/src/include/utils/load/ag_load_labels.h 
b/src/include/utils/load/ag_load_labels.h
index 71fcf97d..5b24719f 100644
--- a/src/include/utils/load/ag_load_labels.h
+++ b/src/include/utils/load/ag_load_labels.h
@@ -22,6 +22,7 @@
 #define AG_LOAD_LABELS_H
 
 #include "access/heapam.h"
+#include "utils/load/age_load.h"
 
 #define AGE_VERTIX 1
 #define AGE_EDGE 2
@@ -45,19 +46,23 @@ typedef struct {
     size_t header_row_length;
     size_t curr_row_length;
     char *graph_name;
-    Oid graph_id;
-    char *object_name;
-    int object_id;
+    Oid graph_oid;
+    char *label_name;
+    int label_id;
+    Oid label_seq_relid;
+    Oid temp_table_relid;
     bool id_field_exists;
     bool load_as_agtype;
+    int curr_seq_num;
+    batch_insert_state *batch_state;
 } csv_vertex_reader;
 
 
 void vertex_field_cb(void *field, size_t field_len, void *data);
 void vertex_row_cb(int delim __attribute__((unused)), void *data);
 
-int create_labels_from_csv_file(char *file_path, char *graph_name, Oid 
graph_id,
-                                char *object_name, int object_id,
+int create_labels_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
+                                char *label_name, int label_id,
                                 bool id_field_exists, bool load_as_agtype);
 
 #endif //AG_LOAD_LABELS_H
diff --git a/src/include/utils/load/age_load.h 
b/src/include/utils/load/age_load.h
index 1c77645d..f8a18edd 100644
--- a/src/include/utils/load/age_load.h
+++ b/src/include/utils/load/age_load.h
@@ -24,11 +24,28 @@
 #include "catalog/ag_graph.h"
 #include "catalog/ag_label.h"
 #include "commands/label_commands.h"
+#include "commands/graph_commands.h"
 #include "utils/ag_cache.h"
 
 #ifndef AGE_ENTITY_CREATOR_H
 #define AGE_ENTITY_CREATOR_H
 
+#define TEMP_VERTEX_ID_TABLE_SUFFIX "_ag_vertex_ids"
+#define GET_TEMP_VERTEX_ID_TABLE(graph_name) \
+    psprintf("_%s%s", graph_name, TEMP_VERTEX_ID_TABLE_SUFFIX)
+
+#define BATCH_SIZE 1000
+
+typedef struct
+{
+    HeapTuple *buffered_tuples;
+    HeapTuple *buffered_id_tuples;
+    TupleDesc desc;
+    TupleDesc id_desc;
+    int num_tuples;
+    int max_tuples;
+} batch_insert_state;
+
 agtype* create_empty_agtype(void);
 
 agtype* create_agtype_from_list(char **header, char **fields,
@@ -42,5 +59,7 @@ void insert_vertex_simple(Oid graph_id, char *label_name, 
graphid vertex_id,
 void insert_edge_simple(Oid graph_id, char *label_name, graphid edge_id,
                         graphid start_id, graphid end_id,
                         agtype* end_properties);
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+                  Oid graph_oid);
 
 #endif //AGE_ENTITY_CREATOR_H

Reply via email to