This is an automated email from the ASF dual-hosted git repository.
jgemignani pushed a commit to branch PG11
in repository https://gitbox.apache.org/repos/asf/age.git
The following commit(s) were added to refs/heads/PG11 by this push:
new 7ee9156f Revamp age csv loader (#2044) (#2068)
7ee9156f is described below
commit 7ee9156f2b00d9932d5a9db5274673a7121604a0
Author: Muhammad Taha Naveed <[email protected]>
AuthorDate: Thu Aug 22 19:19:23 2024 +0500
Revamp age csv loader (#2044) (#2068)
* Allow 0 as entry_id
- No regression test were impacted by this change.
* Use batch inserts to improve performance
- Changed heap_insert to heap_multi_insert since it is faster than
calling heap_insert() in a loop. When multiple tuples can be inserted
on a single page, just a single WAL record covering all of them, and
only need to lock/unlock the page once.
- BATCH_SIZE is set to 1000, which is the number of tuples to insert in
a single batch. This number was chosen after some experimentation.
- Change some of the field names to avoid confusion.
* Use sequence for generating ids for edge and vertex
- Sequence is not used if the id_field_exists is true in
load_labels_from_file function, since the entry id is present in the
csv.
* Add function to create temporary table for ids
- Created a temporary table and populate it with already generated
vertex ids. A unique index is created on id column to ensure that
new ids generated (using entry id from csv) are unique.
* Insert generated ids in the temporary table to enforce uniqueness
- Insert ids in the temporary table and also update index to
enforce uniqueness.
- If the entry id provided in the CSV is greater than the current
sequence value, the sequence value is updated to match the entry ID.
For example:
Suppose the current sequence value is 1, and the CSV entry ID is 2.
If we use 2 but not update the sequence to 2, next time the CREATE
clause is used, 2 will be returned by sequence as an entry id,
resulting in duplicate.
- Update batch functions
* Add functions to create graph and label automatically
- These functions will check existence of graph and label, and create
them if they don't exist.
* Add regression tests
---
regress/expected/age_load.out | 139 +++++++++++-----
regress/sql/age_load.sql | 74 ++++++++-
src/backend/catalog/ag_graph.c | 4 +-
src/backend/catalog/ag_label.c | 5 +
src/backend/commands/graph_commands.c | 34 ++--
src/backend/utils/load/ag_load_edges.c | 120 ++++++++++----
src/backend/utils/load/ag_load_labels.c | 283 +++++++++++++++++++++++++++++---
src/backend/utils/load/age_load.c | 137 ++++++++++++++--
src/include/catalog/ag_graph.h | 1 +
src/include/catalog/ag_label.h | 2 +
src/include/commands/graph_commands.h | 1 +
src/include/utils/graphid.h | 5 +-
src/include/utils/load/ag_load_edges.h | 16 +-
src/include/utils/load/ag_load_labels.h | 15 +-
src/include/utils/load/age_load.h | 19 +++
15 files changed, 719 insertions(+), 136 deletions(-)
diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out
index 8635a499..8f216341 100644
--- a/regress/expected/age_load.out
+++ b/regress/expected/age_load.out
@@ -19,6 +19,7 @@
\! cp -r regress/age_load/data regress/instance/data/age_load
LOAD 'age';
SET search_path TO ag_catalog;
+-- Create a country using CREATE clause
SELECT create_graph('agload_test_graph');
NOTICE: graph "agload_test_graph" has been created
create_graph
@@ -26,34 +27,79 @@ NOTICE: graph "agload_test_graph" has been created
(1 row)
-SELECT create_vlabel('agload_test_graph','Country');
-NOTICE: VLabel "Country" has been created
- create_vlabel
----------------
-
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1})
RETURN n$$) as (n agtype);
+ n
+----------------------------------------------------------------------------------
+ {"id": 844424930131969, "label": "Country", "properties": {"__id__":
1}}::vertex
(1 row)
+--
+-- Load countries with id
+--
SELECT load_labels_from_file('agload_test_graph', 'Country',
- 'age_load/countries.csv');
+ 'age_load/countries.csv', true);
load_labels_from_file
-----------------------
(1 row)
-SELECT create_vlabel('agload_test_graph','City');
-NOTICE: VLabel "City" has been created
- create_vlabel
----------------
-
+-- A temporary table should have been created with 54 ids; 1 from CREATE and
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column?
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+ ?column?
+----------
+ t
(1 row)
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+ 'age_load/countries.csv', true);
+ERROR: Cannot insert duplicate vertex id: 844424930131970
+HINT: Entry id 2 is already used
+--
+-- Load cities with id
+--
+-- Should create City label automatically and load cities
SELECT load_labels_from_file('agload_test_graph', 'City',
- 'age_load/cities.csv');
+ 'age_load/cities.csv', true);
+NOTICE: VLabel "City" has been created
load_labels_from_file
-----------------------
(1 row)
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+ ?column?
+----------
+ t
+(1 row)
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+ ?column?
+----------
+ t
+(1 row)
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+ 'age_load/cities.csv', true);
+ERROR: Cannot insert duplicate vertex id: 1125899906842777
+HINT: Entry id 153 is already used
+--
+-- Load edges -- Connects cities to countries
+--
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+ 'age_load/edges.csv');
+ERROR: label "Country" already exists as edge label
SELECT create_elabel('agload_test_graph','has_city');
NOTICE: ELabel "has_city" has been created
create_elabel
@@ -68,6 +114,17 @@ SELECT load_edges_from_file('agload_test_graph', 'has_city',
(1 row)
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+ ?column?
+----------
+ t
+(1 row)
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+ 'age_load/cities.csv');
+ERROR: label "has_city" already exists as vertex label
SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -83,7 +140,7 @@ WHERE table_schema = 'agload_test_graph' ORDER BY table_name
ASC;
SELECT COUNT(*) FROM agload_test_graph."Country";
count
-------
- 53
+ 54
(1 row)
SELECT COUNT(*) FROM agload_test_graph."City";
@@ -101,7 +158,7 @@ SELECT COUNT(*) FROM agload_test_graph."has_city";
SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n
agtype);
count
-------
- 72538
+ 72539
(1 row)
SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN
e$$) as (n agtype);
@@ -110,6 +167,17 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH
(a)-[e]->(b) RETURN e$$
72485
(1 row)
+--
+-- Load countries and cities without id
+--
+-- Should load countries in Country label without error since it should use
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+ 'age_load/countries.csv', false);
+ load_labels_from_file
+-----------------------
+
+(1 row)
+
SELECT create_vlabel('agload_test_graph','Country2');
NOTICE: VLabel "Country2" has been created
create_vlabel
@@ -153,6 +221,7 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
SELECT id FROM agload_test_graph."Country" LIMIT 10;
id
-----------------
+ 844424930131969
844424930131970
844424930131971
844424930131974
@@ -162,7 +231,6 @@ SELECT id FROM agload_test_graph."Country" LIMIT 10;
844424930131996
844424930132002
844424930132023
- 844424930132025
(10 rows)
SELECT id FROM agload_test_graph."Country2" LIMIT 10;
@@ -180,13 +248,16 @@ SELECT id FROM agload_test_graph."Country2" LIMIT 10;
1688849860263946
(10 rows)
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
id(n) | n.name | n.iso2
-----------------+-----------+--------
844424930131990 | "Belgium" | "BE"
-(1 row)
+ 844424930132223 | "Belgium" | "BE"
+(2 rows)
+-- Should return 1 row
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
id(n) | n.name | n.iso2
@@ -194,13 +265,16 @@ SELECT * FROM cypher('agload_test_graph',
$$MATCH(n:Country2 {iso2 : 'BE'})
1688849860263942 | "Belgium" | "BE"
(1 row)
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
id(n) | n.name | n.iso2
-----------------+-----------+--------
844424930131983 | "Austria" | "AT"
-(1 row)
+ 844424930132221 | "Austria" | "AT"
+(2 rows)
+-- Should return 1 row
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
id(n) | n.name | n.iso2
@@ -208,15 +282,17 @@ SELECT * FROM cypher('agload_test_graph',
$$MATCH(n:Country2 {iso2 : 'AT'})
1688849860263940 | "Austria" | "AT"
(1 row)
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$
MATCH (u:Country {region : "Europe"})
WHERE u.name =~ 'Cro.*'
- RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
- result_1 | result_2
------------+----------
- "Croatia" | "Europe"
-(1 row)
+ RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
+ id(u) | result_1 | result_2
+-----------------+-----------+----------
+ 844424930132023 | "Croatia" | "Europe"
+ 844424930132226 | "Croatia" | "Europe"
+(2 rows)
SELECT drop_graph('agload_test_graph', true);
NOTICE: drop cascades to 7 other objects
@@ -236,22 +312,11 @@ NOTICE: graph "agload_test_graph" has been dropped
--
-- Test property type conversion
--
-SELECT create_graph('agload_conversion');
-NOTICE: graph "agload_conversion" has been created
- create_graph
---------------
-
-(1 row)
-
-- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
-NOTICE: VLabel "Person1" has been created
- create_vlabel
----------------
-
-(1 row)
-
+-- Should create graph and label automatically
SELECT load_labels_from_file('agload_conversion', 'Person1',
'age_load/conversion_vertices.csv', true, true);
+NOTICE: graph "agload_conversion" has been created
+NOTICE: VLabel "Person1" has been created
load_labels_from_file
-----------------------
diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql
index cee34f59..0d76654f 100644
--- a/regress/sql/age_load.sql
+++ b/regress/sql/age_load.sql
@@ -22,20 +22,65 @@
LOAD 'age';
SET search_path TO ag_catalog;
+
+-- Create a country using CREATE clause
SELECT create_graph('agload_test_graph');
-SELECT create_vlabel('agload_test_graph','Country');
+SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1})
RETURN n$$) as (n agtype);
+
+--
+-- Load countries with id
+--
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+ 'age_load/countries.csv', true);
+
+-- A temporary table should have been created with 54 ids; 1 from CREATE and
53 from file
+SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 248
+SELECT currval('agload_test_graph."Country_id_seq"')=248;
+
+-- Should error out on loading the same file again due to duplicate id
SELECT load_labels_from_file('agload_test_graph', 'Country',
- 'age_load/countries.csv');
+ 'age_load/countries.csv', true);
+
+--
+-- Load cities with id
+--
-SELECT create_vlabel('agload_test_graph','City');
+-- Should create City label automatically and load cities
SELECT load_labels_from_file('agload_test_graph', 'City',
- 'age_load/cities.csv');
+ 'age_load/cities.csv', true);
+
+-- Temporary table should have 54+72485 rows now
+SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids";
+
+-- Sequence should be equal to max entry id i.e. 146941
+SELECT currval('agload_test_graph."City_id_seq"')=146941;
+
+-- Should error out on loading the same file again due to duplicate id
+SELECT load_labels_from_file('agload_test_graph', 'City',
+ 'age_load/cities.csv', true);
+
+--
+-- Load edges -- Connects cities to countries
+--
+
+-- Should error out for using vertex label
+SELECT load_edges_from_file('agload_test_graph', 'Country',
+ 'age_load/edges.csv');
SELECT create_elabel('agload_test_graph','has_city');
SELECT load_edges_from_file('agload_test_graph', 'has_city',
'age_load/edges.csv');
+-- Sequence should be equal to number of edges loaded i.e. 72485
+SELECT currval('agload_test_graph."has_city_id_seq"')=72485;
+
+-- Should error out for using edge label
+SELECT load_labels_from_file('agload_test_graph', 'has_city',
+ 'age_load/cities.csv');
+
SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC;
@@ -48,6 +93,14 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n)
RETURN n$$) as (n ag
SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN
e$$) as (n agtype);
+--
+-- Load countries and cities without id
+--
+
+-- Should load countries in Country label without error since it should use
sequence now
+SELECT load_labels_from_file('agload_test_graph', 'Country',
+ 'age_load/countries.csv', false);
+
SELECT create_vlabel('agload_test_graph','Country2');
SELECT load_labels_from_file('agload_test_graph', 'Country2',
'age_load/countries.csv', false);
@@ -62,31 +115,36 @@ SELECT COUNT(*) FROM agload_test_graph."City2";
SELECT id FROM agload_test_graph."Country" LIMIT 10;
SELECT id FROM agload_test_graph."Country2" LIMIT 10;
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
+-- Should return 1 row
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
+-- Should return 1 row
SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'})
RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype,
"n.iso2" agtype);
+-- Should return 2 rows for Country with same properties, but different ids
SELECT * FROM cypher('agload_test_graph', $$
MATCH (u:Country {region : "Europe"})
WHERE u.name =~ 'Cro.*'
- RETURN u.name, u.region
-$$) AS (result_1 agtype, result_2 agtype);
+ RETURN id(u), u.name, u.region
+$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype);
SELECT drop_graph('agload_test_graph', true);
--
-- Test property type conversion
--
-SELECT create_graph('agload_conversion');
-- vertex: load as agtype
-SELECT create_vlabel('agload_conversion','Person1');
+
+-- Should create graph and label automatically
SELECT load_labels_from_file('agload_conversion', 'Person1',
'age_load/conversion_vertices.csv', true, true);
SELECT * FROM cypher('agload_conversion', $$ MATCH (n:Person1) RETURN
properties(n) $$) as (a agtype);
diff --git a/src/backend/catalog/ag_graph.c b/src/backend/catalog/ag_graph.c
index c1e53d6a..7000d80b 100644
--- a/src/backend/catalog/ag_graph.c
+++ b/src/backend/catalog/ag_graph.c
@@ -36,8 +36,6 @@
#include "catalog/ag_graph.h"
#include "utils/ag_cache.h"
-static Oid get_graph_namespace(const char *graph_name);
-
// INSERT INTO ag_catalog.ag_graph VALUES (graph_name, nsp_id)
Oid insert_graph(const Name graph_name, const Oid nsp_id)
{
@@ -160,7 +158,7 @@ Oid get_graph_oid(const char *graph_name)
return InvalidOid;
}
-static Oid get_graph_namespace(const char *graph_name)
+Oid get_graph_namespace(const char *graph_name)
{
graph_cache_data *cache_data;
diff --git a/src/backend/catalog/ag_label.c b/src/backend/catalog/ag_label.c
index 41a11abe..e60a1d56 100644
--- a/src/backend/catalog/ag_label.c
+++ b/src/backend/catalog/ag_label.c
@@ -185,6 +185,11 @@ char get_label_kind(const char *label_name, Oid
label_graph)
}
}
+char *get_label_seq_relation_name(const char *label_name)
+{
+ return psprintf("%s_id_seq", label_name);
+}
+
PG_FUNCTION_INFO_V1(_label_name);
/*
diff --git a/src/backend/commands/graph_commands.c
b/src/backend/commands/graph_commands.c
index 4216652b..e912518a 100644
--- a/src/backend/commands/graph_commands.c
+++ b/src/backend/commands/graph_commands.c
@@ -43,6 +43,7 @@
#include "catalog/ag_label.h"
#include "commands/label_commands.h"
#include "utils/graphid.h"
+#include "commands/graph_commands.h"
#include "utils/name_validation.h"
/*
@@ -61,10 +62,7 @@ PG_FUNCTION_INFO_V1(create_graph);
/* function that is evoked for creating a graph */
Datum create_graph(PG_FUNCTION_ARGS)
{
- char *graph;
Name graph_name;
- char *graph_name_str;
- Oid nsp_id;
//if no argument is passed with the function, graph name cannot be null
if (PG_ARGISNULL(0))
@@ -76,6 +74,23 @@ Datum create_graph(PG_FUNCTION_ARGS)
//gets graph name as function argument
graph_name = PG_GETARG_NAME(0);
+ create_graph_internal(graph_name);
+
+ ereport(NOTICE,
+ (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+
+ /*
+ * According to postgres specification of c-language functions
+ * if function returns void this is the syntax.
+ */
+ PG_RETURN_VOID();
+}
+
+Oid create_graph_internal(const Name graph_name)
+{
+ Oid nsp_id;
+ char *graph_name_str;
+
graph_name_str = NameStr(*graph_name);
//checking if the name of the graph falls under the pre-decided graph
naming conventions(regex)
@@ -101,16 +116,11 @@ Datum create_graph(PG_FUNCTION_ARGS)
//Increment the Command counter before create the generic labels.
CommandCounterIncrement();
- //Create the default label tables
- graph = graph_name->data;
- create_label(graph, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL);
- create_label(graph, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
+ /* Create the default label tables */
+ create_label(graph_name_str, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX,
NIL);
+ create_label(graph_name_str, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL);
- ereport(NOTICE,
- (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
-
- //according to postgres specification of c-language functions if function
returns void this is the syntax
- PG_RETURN_VOID();
+ return nsp_id;
}
static Oid create_schema_for_graph(const Name graph_name)
diff --git a/src/backend/utils/load/ag_load_edges.c
b/src/backend/utils/load/ag_load_edges.c
index d6ae29ff..65ce9366 100644
--- a/src/backend/utils/load/ag_load_edges.c
+++ b/src/backend/utils/load/ag_load_edges.c
@@ -16,21 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include "postgres.h"
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <stdlib.h>
-#include <unistd.h>
-
-#include "utils/load/csv.h"
#include "utils/load/ag_load_edges.h"
-#include "utils/load/age_load.h"
+#include "utils/load/csv.h"
+static void init_edge_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid);
+static void finish_edge_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid);
void edge_field_cb(void *field, size_t field_len, void *data)
{
-
csv_edge_reader *cr = (csv_edge_reader*)data;
if (cr->error)
{
@@ -61,8 +58,8 @@ void edge_field_cb(void *field, size_t field_len, void *data)
// Parser calls this function when it detects end of a row
void edge_row_cb(int delim __attribute__((unused)), void *data)
{
-
csv_edge_reader *cr = (csv_edge_reader*)data;
+ batch_insert_state *batch_state = cr->batch_state;
size_t i, n_fields;
int64 start_id_int;
@@ -73,9 +70,12 @@ void edge_row_cb(int delim __attribute__((unused)), void
*data)
graphid end_vertex_graph_id;
int end_vertex_type_id;
- graphid object_graph_id;
+ graphid edge_id;
+ int64 entry_id;
- agtype* props = NULL;
+ Datum values[4];
+ bool isnull[4] = {false, false, false, false};
+ HeapTuple tuple;
n_fields = cr->cur_field;
@@ -94,24 +94,38 @@ void edge_row_cb(int delim __attribute__((unused)), void
*data)
}
else
{
- object_graph_id = make_graphid(cr->object_id, (int64)cr->row);
+ entry_id = nextval_internal(cr->label_seq_relid, true);
+ edge_id = make_graphid(cr->label_id, entry_id);
start_id_int = strtol(cr->fields[0], NULL, 10);
- start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_id);
+ start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
end_id_int = strtol(cr->fields[2], NULL, 10);
- end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_id);
+ end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);
start_vertex_graph_id = make_graphid(start_vertex_type_id,
start_id_int);
end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
- props = create_agtype_from_list_i(cr->header, cr->fields,
- n_fields, 4, cr->load_as_agtype);
+ /* Fill the values */
+ values[0] = GRAPHID_GET_DATUM(edge_id);
+ values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
+ values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
+ values[3] = AGTYPE_P_GET_DATUM(
+ create_agtype_from_list_i(
+ cr->header, cr->fields,
+ n_fields, 4, cr->load_as_agtype));
+
+ /* Create and insert the tuple into the batch state */
+ tuple = heap_form_tuple(batch_state->desc, values, isnull);
+ batch_state->buffered_tuples[batch_state->num_tuples] = tuple;
- insert_edge_simple(cr->graph_id, cr->object_name,
- object_graph_id, start_vertex_graph_id,
- end_vertex_graph_id, props);
+ batch_state->num_tuples++;
- pfree(props);
+ if (batch_state->num_tuples >= batch_state->max_tuples)
+ {
+ /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+ insert_batch(batch_state, cr->label_name, cr->graph_oid);
+ batch_state->num_tuples = 0;
+ }
}
for (i = 0; i < n_fields; ++i)
@@ -124,7 +138,6 @@ void edge_row_cb(int delim __attribute__((unused)), void
*data)
ereport(NOTICE,(errmsg("THere is some error")));
}
-
cr->cur_field = 0;
cr->curr_row_length = 0;
cr->row += 1;
@@ -156,9 +169,9 @@ static int is_term(unsigned char c)
int create_edges_from_csv_file(char *file_path,
char *graph_name,
- Oid graph_id,
- char *object_name,
- int object_id,
+ Oid graph_oid,
+ char *label_name,
+ int label_id,
bool load_as_agtype)
{
@@ -168,6 +181,8 @@ int create_edges_from_csv_file(char *file_path,
size_t bytes_read;
unsigned char options = 0;
csv_edge_reader cr;
+ char *label_seq_name;
+ Oid nsp_id;
if (csv_init(&p, options) != 0)
{
@@ -185,6 +200,8 @@ int create_edges_from_csv_file(char *file_path,
(errmsg("Failed to open %s\n", file_path)));
}
+ nsp_id = get_graph_namespace(graph_name);
+ label_seq_name = get_label_seq_relation_name(label_name);
memset((void*)&cr, 0, sizeof(csv_edge_reader));
cr.alloc = 128;
@@ -193,11 +210,15 @@ int create_edges_from_csv_file(char *file_path,
cr.header_row_length = 0;
cr.curr_row_length = 0;
cr.graph_name = graph_name;
- cr.graph_id = graph_id;
- cr.object_name = object_name;
- cr.object_id = object_id;
+ cr.graph_oid = graph_oid;
+ cr.label_name = label_name;
+ cr.label_id = label_id;
+ cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id);
cr.load_as_agtype = load_as_agtype;
+ /* Initialize the batch insert state */
+ init_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
{
if (csv_parse(&p, buf, bytes_read, edge_field_cb,
@@ -210,6 +231,9 @@ int create_edges_from_csv_file(char *file_path,
csv_fini(&p, edge_field_cb, edge_row_cb, &cr);
+ /* Finish any remaining batch inserts */
+ finish_edge_batch_insert(&cr.batch_state, label_name, graph_oid);
+
if (ferror(fp))
{
ereport(ERROR, (errmsg("Error while reading file %s\n", file_path)));
@@ -221,3 +245,43 @@ int create_edges_from_csv_file(char *file_path,
csv_free(&p);
return EXIT_SUCCESS;
}
+
+/*
+ * Initialize the batch insert state for edges.
+ */
+static void init_edge_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid)
+{
+ Relation relation;
+
+ // Open a temporary relation to get the tuple descriptor
+ relation = heap_open(get_label_relation(label_name, graph_oid),
AccessShareLock);
+
+ // Initialize the batch insert state
+ *batch_state = palloc(sizeof(batch_insert_state));
+ (*batch_state)->max_tuples = BATCH_SIZE;
+ (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple));
+ (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation));
+ (*batch_state)->num_tuples = 0;
+
+ heap_close(relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for edges. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+static void finish_edge_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid)
+{
+ if ((*batch_state)->num_tuples > 0)
+ {
+ insert_batch(*batch_state, label_name, graph_oid);
+ (*batch_state)->num_tuples = 0;
+ }
+
+ // Clean up batch state
+ pfree((*batch_state)->buffered_tuples);
+ pfree(*batch_state);
+ *batch_state = NULL;
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/ag_load_labels.c
b/src/backend/utils/load/ag_load_labels.c
index 6f79071d..194a6dc8 100644
--- a/src/backend/utils/load/ag_load_labels.c
+++ b/src/backend/utils/load/ag_load_labels.c
@@ -17,11 +17,28 @@
* under the License.
*/
#include "postgres.h"
+#include "executor/spi.h"
+#include "catalog/namespace.h"
+#include "executor/executor.h"
+#include "access/xact.h"
+#include "executor/tuptable.h"
+#include "utils/rel.h"
#include "utils/load/ag_load_labels.h"
-#include "utils/load/age_load.h"
#include "utils/load/csv.h"
+static void setup_temp_table_for_vertex_ids(char *graph_name);
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+ Oid graph_oid, Oid relid);
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid,
+ Oid temp_table_relid);
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid,
+ Oid temp_table_relid);
+static void insert_vertex_batch(batch_insert_state *batch_state, char
*label_name,
+ Oid graph_oid, Oid temp_table_relid);
+
void vertex_field_cb(void *field, size_t field_len, void *data)
{
@@ -55,16 +72,20 @@ void vertex_field_cb(void *field, size_t field_len, void
*data)
void vertex_row_cb(int delim __attribute__((unused)), void *data)
{
-
csv_vertex_reader *cr = (csv_vertex_reader*)data;
- agtype *props = NULL;
+ batch_insert_state *batch_state = cr->batch_state;
size_t i, n_fields;
- graphid object_graph_id;
- int64 label_id_int;
+ graphid vertex_id;
+ int64 entry_id;
+ Datum values[2];
+ bool nulls[2] = {false, false};
+ Datum temp_table_values[1];
+ bool temp_table_nulls[1] = {false};
+ HeapTuple tuple;
+ HeapTuple temp_table_tuple;
n_fields = cr->cur_field;
-
if (cr->row == 0)
{
cr->header_num = cr->cur_field;
@@ -82,36 +103,61 @@ void vertex_row_cb(int delim __attribute__((unused)), void
*data)
{
if (cr->id_field_exists)
{
- label_id_int = strtol(cr->fields[0], NULL, 10);
+ entry_id = strtol(cr->fields[0], NULL, 10);
+ if (entry_id > cr->curr_seq_num)
+ {
+ DirectFunctionCall2(setval_oid,
+ ObjectIdGetDatum(cr->label_seq_relid),
+ Int64GetDatum(entry_id));
+ cr->curr_seq_num = entry_id;
+ }
}
else
{
- label_id_int = (int64)cr->row;
+ entry_id = nextval_internal(cr->label_seq_relid, true);
}
- object_graph_id = make_graphid(cr->object_id, label_id_int);
+ vertex_id = make_graphid(cr->label_id, entry_id);
- props = create_agtype_from_list(cr->header, cr->fields,
- n_fields, label_id_int,
- cr->load_as_agtype);
- insert_vertex_simple(cr->graph_id, cr->object_name,
- object_graph_id, props);
- pfree(props);
- }
+ /* Fill the values */
+ values[0] = GRAPHID_GET_DATUM(vertex_id);
+ values[1] = AGTYPE_P_GET_DATUM(
+ create_agtype_from_list(cr->header, cr->fields,
+ n_fields, entry_id,
+ cr->load_as_agtype));
+
+ temp_table_values[0] = GRAPHID_GET_DATUM(vertex_id);
+
+ /* Create the tuple */
+ tuple = heap_form_tuple(batch_state->desc, values, nulls);
+ temp_table_tuple = heap_form_tuple(batch_state->id_desc,
temp_table_values,
+ temp_table_nulls);
+
+ /* Store the tuple in the batch state */
+ batch_state->buffered_tuples[batch_state->num_tuples] = tuple;
+ batch_state->buffered_id_tuples[batch_state->num_tuples] =
temp_table_tuple;
+
+ batch_state->num_tuples++;
+ if (batch_state->num_tuples >= batch_state->max_tuples)
+ {
+ /* Insert the batch when it is full (i.e. BATCH_SIZE) */
+ insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid,
+ cr->temp_table_relid);
+ batch_state->num_tuples = 0;
+ }
+ }
for (i = 0; i < n_fields; ++i)
{
free(cr->fields[i]);
}
-
if (cr->error)
{
ereport(NOTICE,(errmsg("THere is some error")));
}
-
cr->cur_field = 0;
cr->curr_row_length = 0;
cr->row += 1;
@@ -143,9 +189,9 @@ static int is_term(unsigned char c)
int create_labels_from_csv_file(char *file_path,
char *graph_name,
- Oid graph_id,
- char *object_name,
- int object_id,
+ Oid graph_oid,
+ char *label_name,
+ int label_id,
bool id_field_exists,
bool load_as_agtype)
{
@@ -156,6 +202,9 @@ int create_labels_from_csv_file(char *file_path,
size_t bytes_read;
unsigned char options = 0;
csv_vertex_reader cr;
+ char *label_seq_name;
+ Oid temp_table_relid;
+ Oid nsp_id;
if (csv_init(&p, options) != 0)
{
@@ -163,6 +212,13 @@ int create_labels_from_csv_file(char *file_path,
(errmsg("Failed to initialize csv parser\n")));
}
+ temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+ if (!OidIsValid(temp_table_relid))
+ {
+ setup_temp_table_for_vertex_ids(graph_name);
+ temp_table_relid =
RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name));
+ }
+
csv_set_space_func(&p, is_space);
csv_set_term_func(&p, is_term);
@@ -173,6 +229,8 @@ int create_labels_from_csv_file(char *file_path,
(errmsg("Failed to open %s\n", file_path)));
}
+ nsp_id = get_graph_namespace(graph_name);
+ label_seq_name = get_label_seq_relation_name(label_name);
memset((void*)&cr, 0, sizeof(csv_vertex_reader));
@@ -182,13 +240,29 @@ int create_labels_from_csv_file(char *file_path,
cr.header_row_length = 0;
cr.curr_row_length = 0;
cr.graph_name = graph_name;
- cr.graph_id = graph_id;
- cr.object_name = object_name;
- cr.object_id = object_id;
+ cr.graph_oid = graph_oid;
+ cr.label_name = label_name;
+ cr.label_id = label_id;
cr.id_field_exists = id_field_exists;
+ cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id);
cr.load_as_agtype = load_as_agtype;
+ cr.temp_table_relid = temp_table_relid;
+
+ if (cr.id_field_exists)
+ {
+ /*
+ * Set the curr_seq_num since we will need it to compare with
+ * incoming entry_id.
+ *
+ * We cant use currval because it will error out if nextval was
+ * not called before in the session.
+ */
+ cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
+ }
-
+ /* Initialize the batch insert state */
+ init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+ cr.temp_table_relid);
while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
{
@@ -202,6 +276,10 @@ int create_labels_from_csv_file(char *file_path,
csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
+ /* Finish any remaining batch inserts */
+ finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid,
+ cr.temp_table_relid);
+
if (ferror(fp))
{
ereport(ERROR, (errmsg("Error while reading file %s\n",
@@ -214,3 +292,158 @@ int create_labels_from_csv_file(char *file_path,
csv_free(&p);
return EXIT_SUCCESS;
}
+
+static void insert_vertex_batch(batch_insert_state *batch_state, char
*label_name,
+ Oid graph_oid, Oid temp_table_relid)
+{
+ insert_batch_in_temp_table(batch_state, graph_oid, temp_table_relid);
+ insert_batch(batch_state, label_name, graph_oid);
+}
+
+/*
+ * Create and populate a temporary table with vertex ids that are already
+ * present in the graph. This table will be used to check if the new vertex
+ * id generated by loader is a duplicate.
+ * Unique index is created to enforce uniqueness of the ids.
+ *
+ * We dont need this for loading edges since the ids are generated using
+ * sequence and are unique.
+ */
+static void setup_temp_table_for_vertex_ids(char *graph_name)
+{
+ char *create_as_query;
+ char *index_query;
+
+ create_as_query = psprintf("CREATE TEMP TABLE IF NOT EXISTS %s AS "
+ "SELECT DISTINCT id FROM \"%s\".%s",
+ GET_TEMP_VERTEX_ID_TABLE(graph_name),
graph_name,
+ AG_DEFAULT_LABEL_VERTEX);
+
+ index_query = psprintf("CREATE UNIQUE INDEX ON %s (id)",
+ GET_TEMP_VERTEX_ID_TABLE(graph_name));
+ SPI_connect();
+ SPI_execute(create_as_query, false, 0);
+ SPI_execute(index_query, false, 0);
+
+ SPI_finish();
+
+ pfree(create_as_query);
+ pfree(index_query);
+}
+
+/*
+ * Inserts batch of tuples into the temporary table.
+ * This function also updates the index to check for
+ * uniqueness of the ids.
+ */
+static void insert_batch_in_temp_table(batch_insert_state *batch_state,
+ Oid graph_oid, Oid relid)
+{
+ int i;
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ Relation rel;
+ List *result;
+
+ rel = heap_open(relid, RowExclusiveLock);
+
+ /* Initialize executor state */
+ estate = CreateExecutorState();
+
+ /* Initialize result relation information */
+ resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(resultRelInfo, rel, 1, NULL, estate->es_instrument);
+ estate->es_result_relation_info = resultRelInfo;
+
+ /* Open the indices */
+ ExecOpenIndices(resultRelInfo, false);
+
+ /* Insert the batch into the temporary table */
+ heap_multi_insert(rel, batch_state->buffered_id_tuples,
+ batch_state->num_tuples, GetCurrentCommandId(true),
+ false, NULL);
+
+ for (i = 0; i < batch_state->num_tuples; i++)
+ {
+ TupleTableSlot *slot;
+
+ slot = MakeSingleTupleTableSlot(batch_state->id_desc);
+ ExecStoreTuple(batch_state->buffered_id_tuples[i],
+ slot, InvalidBuffer, false);
+ result = ExecInsertIndexTuples(slot,
&(batch_state->buffered_id_tuples[i]->t_self),
+ estate, true, NULL, NIL);
+ /* Check if the unique cnstraint is violated */
+ if (list_length(result) != 0)
+ {
+ Datum id;
+ bool isnull;
+
+ id = slot_getattr(slot, 1, &isnull);
+ pfree(slot);
+ ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld",
+ DATUM_GET_GRAPHID(id)),
+ errhint("Entry id %ld is already used",
+ get_graphid_entry_id(id))));
+ }
+
+ pfree(slot);
+ }
+ /* Clean up and close the indices */
+ ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
+ heap_close(rel, RowExclusiveLock);
+
+ CommandCounterIncrement();
+}
+
+/*
+ * Initialize the batch insert state for vertices.
+ */
+static void init_vertex_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid,
+ Oid temp_table_relid)
+{
+ Relation relation;
+ Oid relid;
+ Relation temp_table_relation;
+
+ /* Open a temporary relation to get the tuple descriptor */
+ relid = get_label_relation(label_name, graph_oid);
+ relation = heap_open(relid, AccessShareLock);
+
+ temp_table_relation = heap_open(temp_table_relid, AccessShareLock);
+
+ /* Initialize the batch insert state */
+ *batch_state = palloc(sizeof(batch_insert_state));
+ (*batch_state)->max_tuples = BATCH_SIZE;
+ (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple));
+ (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation));
+ (*batch_state)->id_desc =
CreateTupleDescCopy(RelationGetDescr(temp_table_relation));
+ (*batch_state)->buffered_id_tuples = palloc(BATCH_SIZE *
sizeof(HeapTuple));
+ (*batch_state)->num_tuples = 0;
+
+ heap_close(relation, AccessShareLock);
+ heap_close(temp_table_relation, AccessShareLock);
+}
+
+/*
+ * Finish the batch insert for vertices. Insert the
+ * remaining tuples in the batch state and clean up.
+ */
+static void finish_vertex_batch_insert(batch_insert_state **batch_state,
+ char *label_name, Oid graph_oid,
+ Oid temp_table_relid)
+{
+ if ((*batch_state)->num_tuples > 0)
+ {
+ insert_vertex_batch(*batch_state, label_name, graph_oid,
temp_table_relid);
+ (*batch_state)->num_tuples = 0;
+ }
+
+ /* Clean up batch state */
+ pfree((*batch_state)->buffered_tuples);
+ pfree((*batch_state)->buffered_id_tuples);
+ pfree(*batch_state);
+ *batch_state = NULL;
+}
\ No newline at end of file
diff --git a/src/backend/utils/load/age_load.c
b/src/backend/utils/load/age_load.c
index 5fa637b6..fd90b604 100644
--- a/src/backend/utils/load/age_load.c
+++ b/src/backend/utils/load/age_load.c
@@ -19,6 +19,7 @@
#include "postgres.h"
#include "utils/jsonapi.h"
+#include "nodes/makefuncs.h"
#include "utils/load/age_load.h"
#include "utils/load/ag_load_labels.h"
@@ -26,6 +27,9 @@
static agtype_value *csv_value_to_agtype_value(char *csv_val);
static bool json_validate(text *json);
+static Oid get_or_create_graph(const Name graph_name);
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+ char *label_name, char label_kind);
agtype *create_empty_agtype(void)
{
@@ -307,6 +311,33 @@ void insert_vertex_simple(Oid graph_id, char* label_name,
CommandCounterIncrement();
}
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+ Oid graph_oid)
+{
+ Relation label_relation;
+ BulkInsertState bistate;
+ Oid relid;
+
+ // Get the relation OID
+ relid = get_label_relation(label_name, graph_oid);
+
+ // Open the relation
+ label_relation = heap_open(relid, RowExclusiveLock);
+
+ // Prepare the BulkInsertState
+ bistate = GetBulkInsertState();
+
+ // Perform the bulk insert
+ heap_multi_insert(label_relation, batch_state->buffered_tuples,
+ batch_state->num_tuples, GetCurrentCommandId(true),
+ false, bistate);
+
+ // Clean up
+ FreeBulkInsertState(bistate);
+ heap_close(label_relation, RowExclusiveLock);
+
+ CommandCounterIncrement();
+}
PG_FUNCTION_INFO_V1(load_labels_from_file);
Datum load_labels_from_file(PG_FUNCTION_ARGS)
@@ -318,7 +349,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
char* graph_name_str;
char* label_name_str;
char* file_path_str;
- Oid graph_id;
+ Oid graph_oid;
int32 label_id;
bool id_field_exists;
bool load_as_agtype;
@@ -347,19 +378,24 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
id_field_exists = PG_GETARG_BOOL(3);
load_as_agtype = PG_GETARG_BOOL(4);
-
graph_name_str = NameStr(*graph_name);
label_name_str = NameStr(*label_name);
+
+ if (strcmp(label_name_str, "") == 0)
+ {
+ label_name_str = AG_DEFAULT_LABEL_VERTEX;
+ }
+
file_path_str = text_to_cstring(file_path);
- graph_id = get_graph_oid(graph_name_str);
- label_id = get_label_id(label_name_str, graph_id);
+ graph_oid = get_or_create_graph(graph_name);
+ label_id = get_or_create_label(graph_oid, graph_name_str,
+ label_name_str, LABEL_KIND_VERTEX);
- create_labels_from_csv_file(file_path_str, graph_name_str, graph_id,
+ create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid,
label_name_str, label_id, id_field_exists,
load_as_agtype);
PG_RETURN_VOID();
-
}
PG_FUNCTION_INFO_V1(load_edges_from_file);
@@ -372,7 +408,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
char* graph_name_str;
char* label_name_str;
char* file_path_str;
- Oid graph_id;
+ Oid graph_oid;
int32 label_id;
bool load_as_agtype;
@@ -401,13 +437,94 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
graph_name_str = NameStr(*graph_name);
label_name_str = NameStr(*label_name);
+
+ if (strcmp(label_name_str, "") == 0)
+ {
+ label_name_str = AG_DEFAULT_LABEL_EDGE;
+ }
+
file_path_str = text_to_cstring(file_path);
- graph_id = get_graph_oid(graph_name_str);
- label_id = get_label_id(label_name_str, graph_id);
+ graph_oid = get_or_create_graph(graph_name);
+ label_id = get_or_create_label(graph_oid, graph_name_str,
+ label_name_str, LABEL_KIND_EDGE);
- create_edges_from_csv_file(file_path_str, graph_name_str, graph_id,
+ create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid,
label_name_str, label_id, load_as_agtype);
PG_RETURN_VOID();
+}
+
+/*
+ * Helper function to create a graph if it does not exist.
+ * Just returns Oid of the graph if it already exists.
+ */
+static Oid get_or_create_graph(const Name graph_name)
+{
+ Oid graph_oid;
+ char *graph_name_str;
+
+ graph_name_str = NameStr(*graph_name);
+ graph_oid = get_graph_oid(graph_name_str);
+
+ if (OidIsValid(graph_oid))
+ {
+ return graph_oid;
+ }
+
+ create_graph_internal(graph_name);
+ graph_oid = get_graph_oid(graph_name_str);
+
+ ereport(NOTICE,
+ (errmsg("graph \"%s\" has been created", NameStr(*graph_name))));
+
+ return graph_oid;
}
+
+/*
+ * Helper function to create a label if it does not exist.
+ * Just returns label_id of the label if it already exists.
+ */
+static int32 get_or_create_label(Oid graph_oid, char *graph_name,
+ char *label_name, char label_kind)
+{
+ int32 label_id;
+
+ label_id = get_label_id(label_name, graph_oid);
+
+ /* Check if label exists */
+ if (label_id_is_valid(label_id))
+ {
+ char *label_kind_full = (label_kind == LABEL_KIND_VERTEX)
+ ? "vertex" : "edge";
+ char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX)
+ ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX;
+
+ /* If it exists, but as another label_kind, throw an error */
+ if (get_label_kind(label_name, graph_oid) == opposite_label_kind)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("label \"%s\" already exists as %s label",
+ label_name, label_kind_full)));
+ }
+ }
+ else
+ {
+ /* Create a label */
+ RangeVar *rv;
+ List *parent;
+ char *default_label = (label_kind == LABEL_KIND_VERTEX)
+ ? AG_DEFAULT_LABEL_VERTEX :
AG_DEFAULT_LABEL_EDGE;
+ rv = get_label_range_var(graph_name, graph_oid, default_label);
+ parent = list_make1(rv);
+
+ create_label(graph_name, label_name, label_kind, parent);
+ label_id = get_label_id(label_name, graph_oid);
+
+ ereport(NOTICE,
+ (errmsg("VLabel \"%s\" has been created", label_name)));
+ }
+
+ return label_id;
+}
\ No newline at end of file
diff --git a/src/include/catalog/ag_graph.h b/src/include/catalog/ag_graph.h
index b1b8f8e4..9b97aaf3 100644
--- a/src/include/catalog/ag_graph.h
+++ b/src/include/catalog/ag_graph.h
@@ -38,6 +38,7 @@ void update_graph_name(const Name graph_name, const Name
new_name);
Oid get_graph_oid(const char *graph_name);
char *get_graph_namespace_name(const char *graph_name);
+Oid get_graph_namespace(const char *graph_name);
List *get_graphnames(void);
void drop_graphs(List *graphnames);
diff --git a/src/include/catalog/ag_label.h b/src/include/catalog/ag_label.h
index 7dfa2226..f742fb19 100644
--- a/src/include/catalog/ag_label.h
+++ b/src/include/catalog/ag_label.h
@@ -75,6 +75,8 @@ int32 get_label_id(const char *label_name, Oid label_graph);
Oid get_label_relation(const char *label_name, Oid label_graph);
char *get_label_relation_name(const char *label_name, Oid label_graph);
char get_label_kind(const char *label_name, Oid label_graph);
+char *get_label_seq_relation_name(const char *label_name);
+
bool label_id_exists(Oid label_graph, int32 label_id);
RangeVar *get_label_range_var(char *graph_name, Oid graph_oid, char
*label_name);
diff --git a/src/include/commands/graph_commands.h
b/src/include/commands/graph_commands.h
index e4d93fc1..d456ef8c 100644
--- a/src/include/commands/graph_commands.h
+++ b/src/include/commands/graph_commands.h
@@ -21,5 +21,6 @@
#define AG_GRAPH_COMMANDS_H
Datum create_graph(PG_FUNCTION_ARGS);
+Oid create_graph_internal(const Name graph_name);
#endif
diff --git a/src/include/utils/graphid.h b/src/include/utils/graphid.h
index 844ce652..fd9504da 100644
--- a/src/include/utils/graphid.h
+++ b/src/include/utils/graphid.h
@@ -35,8 +35,9 @@ typedef int64 graphid;
#define label_id_is_valid(id) (id >= LABEL_ID_MIN && id <= LABEL_ID_MAX)
-#define ENTRY_ID_MIN INT64CONST(1)
-#define ENTRY_ID_MAX INT64CONST(281474976710655) // 0x0000ffffffffffff
+#define ENTRY_ID_MIN INT64CONST(0)
+/* 0x0000ffffffffffff */
+#define ENTRY_ID_MAX INT64CONST(281474976710655)
#define INVALID_ENTRY_ID INT64CONST(0)
#define entry_id_is_valid(id) (id >= ENTRY_ID_MIN && id <= ENTRY_ID_MAX)
diff --git a/src/include/utils/load/ag_load_edges.h
b/src/include/utils/load/ag_load_edges.h
index 57940d45..03ab89e1 100644
--- a/src/include/utils/load/ag_load_edges.h
+++ b/src/include/utils/load/ag_load_edges.h
@@ -17,6 +17,9 @@
* under the License.
*/
+#include "access/heapam.h"
+#include "utils/load/age_load.h"
+
#ifndef AG_LOAD_EDGES_H
#define AG_LOAD_EDGES_H
@@ -38,21 +41,22 @@ typedef struct {
size_t header_row_length;
size_t curr_row_length;
char *graph_name;
- Oid graph_id;
- char *object_name;
- int object_id;
+ Oid graph_oid;
+ char *label_name;
+ int label_id;
+ Oid label_seq_relid;
char *start_vertex;
char *end_vertex;
bool load_as_agtype;
-
+ batch_insert_state *batch_state;
} csv_edge_reader;
void edge_field_cb(void *field, size_t field_len, void *data);
void edge_row_cb(int delim __attribute__((unused)), void *data);
-int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_id,
- char *object_name, int object_id,
+int create_edges_from_csv_file(char *file_path, char *graph_name, Oid
graph_oid,
+ char *label_name, int label_id,
bool load_as_agtype);
#endif //AG_LOAD_EDGES_H
diff --git a/src/include/utils/load/ag_load_labels.h
b/src/include/utils/load/ag_load_labels.h
index 71fcf97d..5b24719f 100644
--- a/src/include/utils/load/ag_load_labels.h
+++ b/src/include/utils/load/ag_load_labels.h
@@ -22,6 +22,7 @@
#define AG_LOAD_LABELS_H
#include "access/heapam.h"
+#include "utils/load/age_load.h"
#define AGE_VERTIX 1
#define AGE_EDGE 2
@@ -45,19 +46,23 @@ typedef struct {
size_t header_row_length;
size_t curr_row_length;
char *graph_name;
- Oid graph_id;
- char *object_name;
- int object_id;
+ Oid graph_oid;
+ char *label_name;
+ int label_id;
+ Oid label_seq_relid;
+ Oid temp_table_relid;
bool id_field_exists;
bool load_as_agtype;
+ int curr_seq_num;
+ batch_insert_state *batch_state;
} csv_vertex_reader;
void vertex_field_cb(void *field, size_t field_len, void *data);
void vertex_row_cb(int delim __attribute__((unused)), void *data);
-int create_labels_from_csv_file(char *file_path, char *graph_name, Oid
graph_id,
- char *object_name, int object_id,
+int create_labels_from_csv_file(char *file_path, char *graph_name, Oid
graph_oid,
+ char *label_name, int label_id,
bool id_field_exists, bool load_as_agtype);
#endif //AG_LOAD_LABELS_H
diff --git a/src/include/utils/load/age_load.h
b/src/include/utils/load/age_load.h
index 1c77645d..f8a18edd 100644
--- a/src/include/utils/load/age_load.h
+++ b/src/include/utils/load/age_load.h
@@ -24,11 +24,28 @@
#include "catalog/ag_graph.h"
#include "catalog/ag_label.h"
#include "commands/label_commands.h"
+#include "commands/graph_commands.h"
#include "utils/ag_cache.h"
#ifndef AGE_ENTITY_CREATOR_H
#define AGE_ENTITY_CREATOR_H
+#define TEMP_VERTEX_ID_TABLE_SUFFIX "_ag_vertex_ids"
+#define GET_TEMP_VERTEX_ID_TABLE(graph_name) \
+ psprintf("_%s%s", graph_name, TEMP_VERTEX_ID_TABLE_SUFFIX)
+
+#define BATCH_SIZE 1000
+
+typedef struct
+{
+ HeapTuple *buffered_tuples;
+ HeapTuple *buffered_id_tuples;
+ TupleDesc desc;
+ TupleDesc id_desc;
+ int num_tuples;
+ int max_tuples;
+} batch_insert_state;
+
agtype* create_empty_agtype(void);
agtype* create_agtype_from_list(char **header, char **fields,
@@ -42,5 +59,7 @@ void insert_vertex_simple(Oid graph_id, char *label_name,
graphid vertex_id,
void insert_edge_simple(Oid graph_id, char *label_name, graphid edge_id,
graphid start_id, graphid end_id,
agtype* end_properties);
+void insert_batch(batch_insert_state *batch_state, char *label_name,
+ Oid graph_oid);
#endif //AGE_ENTITY_CREATOR_H