Author: dcreager
Date: Fri Feb 25 16:22:55 2011
New Revision: 1074618
URL: http://svn.apache.org/viewvc?rev=1074618&view=rev
Log:
AVRO-762. C: Recursive schema resolution
The avro_resolver_new function now works correctly with recursive
schemas. We had to memoize the results of avro_resolver_new, so that we
can detect when we've already created an avro_resolver_t instance for a
pair of schemas. This prevents us from going into an infinite loop
trying to resolve a recursive schema.
This memoization means that there might be multiple references to a
resolver within the graph of resolvers for a recursive schema type.
When freeing this graph of resolvers, we now have to take care to only
free each instance once. We do this by maintaining a set of the
resolvers that we've already encountered during the free operation, and
immediately returning if we try to start freeing a resolver object a
second time.
Modified:
avro/trunk/lang/c/src/avro.h
avro/trunk/lang/c/src/consumer.c
avro/trunk/lang/c/src/resolver.c
avro/trunk/lang/c/src/schema.c
avro/trunk/lang/c/tests/test_avro_data.c
Modified: avro/trunk/lang/c/src/avro.h
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro.h?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro.h (original)
+++ avro/trunk/lang/c/src/avro.h Fri Feb 25 16:22:55 2011
@@ -177,6 +177,7 @@ avro_schema_t avro_schema_union_branch_b
(avro_schema_t union_schema, int *branch_index, const char *name);
avro_schema_t avro_schema_link(avro_schema_t schema);
+avro_schema_t avro_schema_link_target(avro_schema_t schema);
typedef struct avro_schema_error_t_ *avro_schema_error_t;
int avro_schema_from_json(const char *jsontext,
Modified: avro/trunk/lang/c/src/consumer.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/consumer.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/consumer.c (original)
+++ avro/trunk/lang/c/src/consumer.c Fri Feb 25 16:22:55 2011
@@ -19,7 +19,5 @@
void avro_consumer_free(avro_consumer_t *consumer)
{
- if (consumer->free) {
- consumer->free(consumer);
- }
+ consumer->free(consumer);
}
Modified: avro/trunk/lang/c/src/resolver.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/resolver.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/resolver.c (original)
+++ avro/trunk/lang/c/src/resolver.c Fri Feb 25 16:22:55 2011
@@ -24,6 +24,7 @@
#include "avro_errors.h"
#include "avro_private.h"
#include "allocation.h"
+#include "st.h"
#if !defined(DEBUG_RESOLVER)
@@ -66,10 +67,31 @@ struct avro_resolver_t {
};
+/**
+ * Frees a resolver object, while ensuring that all of the resolvers in
+ * a graph of resolvers is only freed once.
+ */
+
static void
-avro_resolver_free(avro_consumer_t *consumer)
+avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
+
+ /*
+ * First check if we've already started freeing this resolver.
+ */
+
+ if (st_lookup(freeing, (st_data_t) resolver, NULL)) {
+ return;
+ }
+
+ /*
+ * Otherwise add this resolver to the freeing set, and then
+ * actually free the thing.
+ */
+
+ st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL);
+
avro_schema_decref(resolver->parent.schema);
avro_schema_decref(resolver->rschema);
if (resolver->child_resolvers) {
@@ -77,7 +99,7 @@ avro_resolver_free(avro_consumer_t *cons
for (i = 0; i < resolver->num_children; i++) {
avro_consumer_t *child = resolver->child_resolvers[i];
if (child) {
- avro_consumer_free(child);
+ avro_resolver_free_cycles(child, freeing);
}
}
avro_free(resolver->child_resolvers,
@@ -91,6 +113,14 @@ avro_resolver_free(avro_consumer_t *cons
}
+static void
+avro_resolver_free(avro_consumer_t *consumer)
+{
+ st_table *freeing = st_init_numtable();
+ avro_resolver_free_cycles(consumer, freeing);
+ st_free_table(freeing);
+}
+
/**
* Create a new avro_resolver_t instance. You must fill in the callback
* pointers that are appropriate for the writer schema after this
@@ -135,6 +165,68 @@ avro_resolver_get_real_dest(avro_resolve
}
+#define skip_links(schema) \
+ while (is_avro_link(schema)) { \
+ schema = avro_schema_link_target(schema); \
+ }
+
+
+/*-----------------------------------------------------------------------
+ * Memoized resolvers
+ */
+
+static int
+avro_resolver_cmp(avro_resolver_t *a, avro_resolver_t *b)
+{
+ return (a->parent.schema != b->parent.schema) || (a->rschema !=
b->rschema);
+}
+
+static int
+avro_resolver_hash(avro_resolver_t *a)
+{
+ return ((uintptr_t) a->parent.schema) ^ ((uintptr_t) a->rschema);
+}
+
+static struct st_hash_type avro_resolver_hash_type = {
+ avro_resolver_cmp,
+ avro_resolver_hash
+};
+
+static void
+save_resolver(st_table *resolvers, avro_resolver_t *resolver)
+{
+ st_insert(resolvers, (st_data_t) resolver, (st_data_t) resolver);
+}
+
+static void
+delete_resolver(st_table *resolvers, avro_resolver_t *resolver)
+{
+ st_delete(resolvers, (st_data_t *) &resolver, (st_data_t *) &resolver);
+}
+
+static avro_resolver_t *
+find_resolver(st_table *resolvers, avro_schema_t wschema, avro_schema_t
rschema)
+{
+ avro_resolver_t dummy;
+ dummy.parent.schema = wschema;
+ dummy.rschema = rschema;
+ union {
+ st_data_t data;
+ avro_resolver_t *resolver;
+ } val;
+ if (st_lookup(resolvers, (st_data_t) &dummy, &val.data)) {
+ return val.resolver;
+ } else {
+ return NULL;
+ }
+}
+
+
+static avro_consumer_t *
+avro_resolver_new_memoized(st_table *resolvers,
+ avro_schema_t wschema, avro_schema_t rschema);
+
+
/*-----------------------------------------------------------------------
* Reader unions
*/
@@ -153,10 +245,10 @@ avro_resolver_get_real_dest(avro_resolve
* compatible.
*/
-#define check_non_union(wschema, rschema, check_func) \
+#define check_non_union(saved, wschema, rschema, check_func) \
do { \
avro_resolver_t *self = NULL; \
- int rc = check_func(&self, wschema, rschema, \
+ int rc = check_func(saved, &self, wschema, rschema, \
rschema); \
if (self) { \
debug("Non-union schemas %s (writer) " \
@@ -182,7 +274,7 @@ do {
\
* check the compatiblity of each branch schema.
*/
-#define check_reader_union(wschema, rschema, check_func) \
+#define check_reader_union(saved, wschema, rschema, check_func)
\
do { \
if (!is_avro_union(rschema)) { \
break; \
@@ -195,8 +287,10 @@ do {
\
for (i = 0; i < num_branches; i++) { \
avro_schema_t branch_schema = \
avro_schema_union_branch(rschema, i); \
+ skip_links(branch_schema); \
avro_resolver_t *self = NULL; \
- int rc = check_func(&self, wschema, branch_schema, \
+ int rc = check_func(saved, &self, \
+ wschema, branch_schema, \
rschema); \
if (self) { \
debug("Reader union branch %d (%s) " \
@@ -205,6 +299,10 @@ do {
\
avro_schema_type_name(wschema)); \
self->reader_union_branch = i; \
return &self->parent; \
+ } else { \
+ debug("Reader union branch %d (%s) " \
+ "doesn't match", \
+ i, avro_schema_type_name(branch_schema)); \
} \
\
if (rc) { \
@@ -220,10 +318,10 @@ do {
\
* check_reader_union for a simple (non-union) writer schema type.
*/
-#define check_simple_writer(wschema, rschema, type_name) \
+#define check_simple_writer(saved, wschema, rschema, type_name)
\
do { \
- check_non_union(wschema, rschema, try_##type_name); \
- check_reader_union(wschema, rschema, try_##type_name); \
+ check_non_union(saved, wschema, rschema, try_##type_name); \
+ check_reader_union(saved, wschema, rschema, try_##type_name); \
debug("Writer %s doesn't match reader %s", \
avro_schema_type_name(wschema), \
avro_schema_type_name(rschema)); \
@@ -249,12 +347,13 @@ avro_resolver_boolean_value(avro_consume
}
static int
-try_boolean(avro_resolver_t **resolver,
+try_boolean(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_boolean(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.boolean_value = avro_resolver_boolean_value;
}
return 0;
@@ -284,12 +383,13 @@ avro_resolver_bytes_value(avro_consumer_
}
static int
-try_bytes(avro_resolver_t **resolver,
+try_bytes(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_bytes(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.bytes_value = avro_resolver_bytes_value;
}
return 0;
@@ -308,12 +408,13 @@ avro_resolver_double_value(avro_consumer
}
static int
-try_double(avro_resolver_t **resolver,
+try_double(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.double_value = avro_resolver_double_value;
}
return 0;
@@ -343,16 +444,18 @@ avro_resolver_float_double_value(avro_co
}
static int
-try_float(avro_resolver_t **resolver,
+try_float(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.float_value = avro_resolver_float_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.float_value =
avro_resolver_float_double_value;
}
return 0;
@@ -404,24 +507,28 @@ avro_resolver_int_float_value(avro_consu
}
static int
-try_int(avro_resolver_t **resolver,
+try_int(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_int32(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_value;
}
else if (is_avro_int64(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_long_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_double_value;
}
else if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_float_value;
}
return 0;
@@ -462,20 +569,23 @@ avro_resolver_long_double_value(avro_con
}
static int
-try_long(avro_resolver_t **resolver,
+try_long(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_int64(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.long_value = avro_resolver_long_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.long_value =
avro_resolver_long_double_value;
}
else if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.long_value = avro_resolver_long_float_value;
}
return 0;
@@ -485,25 +595,23 @@ try_long(avro_resolver_t **resolver,
static int
avro_resolver_null_value(avro_consumer_t *consumer, void *user_data)
{
-#if DEBUG_RESOLVER
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+ AVRO_UNUSED(dest);
debug("Storing null into %p", dest);
-#else
- AVRO_UNUSED(consumer);
- AVRO_UNUSED(user_data);
-#endif
return 0;
}
static int
-try_null(avro_resolver_t **resolver,
+try_null(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_null(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.null_value = avro_resolver_null_value;
}
return 0;
@@ -524,12 +632,13 @@ avro_resolver_string_value(avro_consumer
}
static int
-try_string(avro_resolver_t **resolver,
+try_string(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_string(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.string_value = avro_resolver_string_value;
}
return 0;
@@ -546,18 +655,14 @@ avro_resolver_array_start_block(avro_con
unsigned int block_count,
void *user_data)
{
-#if DEBUG_RESOLVER
if (is_first_block) {
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver,
ud_dest);
+ AVRO_UNUSED(dest);
+
debug("Starting array %p", dest);
}
-#else
- AVRO_UNUSED(consumer);
- AVRO_UNUSED(user_data);
- AVRO_UNUSED(is_first_block);
-#endif
AVRO_UNUSED(block_count);
return 0;
@@ -580,6 +685,7 @@ avro_resolver_array_element(avro_consume
/*
* Allocate a new element datum and add it to the array.
*/
+
avro_schema_t array_schema = avro_datum_get_schema(dest);
avro_schema_t item_schema = avro_schema_array_items(array_schema);
avro_datum_t element = avro_datum_from_schema(item_schema);
@@ -590,13 +696,14 @@ avro_resolver_array_element(avro_consume
* Return the consumer that we allocated to process the array's
* children.
*/
+
*element_consumer = resolver->child_resolvers[0];
*element_user_data = element;
return 0;
}
static int
-try_array(avro_resolver_t **resolver,
+try_array(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
@@ -614,11 +721,17 @@ try_array(avro_resolver_t **resolver,
* check the compatibility.
*/
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
+
avro_schema_t witems = avro_schema_array_items(wschema);
avro_schema_t ritems = avro_schema_array_items(rschema);
- avro_consumer_t *item_consumer = avro_resolver_new(witems, ritems);
+ avro_consumer_t *item_consumer =
+ avro_resolver_new_memoized(resolvers, witems, ritems);
if (!item_consumer) {
+ delete_resolver(resolvers, *resolver);
+ avro_consumer_free(&(*resolver)->parent);
avro_prefix_error("Array values aren't compatible: ");
return EINVAL;
}
@@ -629,7 +742,6 @@ try_array(avro_resolver_t **resolver,
* resolver into the child_resolvers field.
*/
- *resolver = avro_resolver_create(wschema, root_rschema);
(*resolver)->num_children = 1;
(*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t
*));
(*resolver)->child_resolvers[0] = item_consumer;
@@ -660,7 +772,7 @@ avro_resolver_enum_value(avro_consumer_t
}
static int
-try_enum(avro_resolver_t **resolver,
+try_enum(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
@@ -675,6 +787,7 @@ try_enum(avro_resolver_t **resolver,
if (!strcmp(wname, rname)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.enum_value =
avro_resolver_enum_value;
}
}
@@ -699,7 +812,7 @@ avro_resolver_fixed_value(avro_consumer_
}
static int
-try_fixed(avro_resolver_t **resolver,
+try_fixed(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
@@ -709,6 +822,7 @@ try_fixed(avro_resolver_t **resolver,
if (avro_schema_equal(wschema, rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
(*resolver)->parent.fixed_value = avro_resolver_fixed_value;
}
return 0;
@@ -725,18 +839,14 @@ avro_resolver_map_start_block(avro_consu
unsigned int block_count,
void *user_data)
{
-#if DEBUG_RESOLVER
if (is_first_block) {
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver,
ud_dest);
+ AVRO_UNUSED(dest);
+
debug("Starting map %p", dest);
}
-#else
- AVRO_UNUSED(consumer);
- AVRO_UNUSED(user_data);
- AVRO_UNUSED(is_first_block);
-#endif
AVRO_UNUSED(block_count);
return 0;
@@ -778,7 +888,7 @@ avro_resolver_map_element(avro_consumer_
}
static int
-try_map(avro_resolver_t **resolver,
+try_map(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
@@ -796,11 +906,17 @@ try_map(avro_resolver_t **resolver,
* check the compatibility.
*/
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
+
avro_schema_t wvalues = avro_schema_map_values(wschema);
avro_schema_t rvalues = avro_schema_map_values(rschema);
- avro_consumer_t *value_consumer = avro_resolver_new(wvalues, rvalues);
+ avro_consumer_t *value_consumer =
+ avro_resolver_new_memoized(resolvers, wvalues, rvalues);
if (!value_consumer) {
+ delete_resolver(resolvers, *resolver);
+ avro_consumer_free(&(*resolver)->parent);
avro_prefix_error("Map values aren't compatible: ");
return EINVAL;
}
@@ -811,7 +927,6 @@ try_map(avro_resolver_t **resolver,
* resolver into the child_resolvers field.
*/
- *resolver = avro_resolver_create(wschema, root_rschema);
(*resolver)->num_children = 1;
(*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t
*));
(*resolver)->child_resolvers[0] = value_consumer;
@@ -830,15 +945,12 @@ static int
avro_resolver_record_start(avro_consumer_t *consumer,
void *user_data)
{
-#if DEBUG_RESOLVER
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
+ AVRO_UNUSED(dest);
+
debug("Starting record at %p", dest);
-#else
- AVRO_UNUSED(consumer);
- AVRO_UNUSED(user_data);
-#endif
/*
* TODO: Eventually, we'll fill in default values for the extra
@@ -890,7 +1002,7 @@ avro_resolver_record_field(avro_consumer
}
static int
-try_record(avro_resolver_t **resolver,
+try_record(st_table *resolvers, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
@@ -929,6 +1041,9 @@ try_record(avro_resolver_t **resolver,
* skipped when processing the input.
*/
+ *resolver = avro_resolver_create(wschema, root_rschema);
+ save_resolver(resolvers, *resolver);
+
size_t wfields = avro_schema_record_size(wschema);
size_t rfields = avro_schema_record_size(rschema);
@@ -973,7 +1088,8 @@ try_record(avro_resolver_t **resolver,
avro_schema_t wfield =
avro_schema_record_field_get_by_index(wschema, wi);
- avro_consumer_t *field_resolver = avro_resolver_new(wfield,
rfield);
+ avro_consumer_t *field_resolver =
+ avro_resolver_new_memoized(resolvers, wfield, rfield);
if (!field_resolver) {
avro_prefix_error("Field %s isn't compatible: ",
field_name);
@@ -995,7 +1111,6 @@ try_record(avro_resolver_t **resolver,
* but that's okay â any extras will be ignored.
*/
- *resolver = avro_resolver_create(wschema, root_rschema);
(*resolver)->num_children = wfields;
(*resolver)->child_resolvers = child_resolvers;
(*resolver)->index_mapping = index_mapping;
@@ -1008,6 +1123,9 @@ error:
* Clean up any consumer we might have already created.
*/
+ delete_resolver(resolvers, *resolver);
+ avro_consumer_free(&(*resolver)->parent);
+
{
unsigned int i;
for (i = 0; i < wfields; i++) {
@@ -1060,7 +1178,7 @@ avro_resolver_union_branch(avro_consumer
}
static avro_consumer_t *
-try_union(avro_schema_t wschema, avro_schema_t rschema)
+try_union(st_table *resolvers, avro_schema_t wschema, avro_schema_t rschema)
{
/*
* For a writer union, we recursively try to resolve each branch
@@ -1085,6 +1203,9 @@ try_union(avro_schema_t wschema, avro_sc
size_t num_branches = avro_schema_union_size(wschema);
debug("Checking %zu-branch writer union schema", num_branches);
+ avro_resolver_t *resolver = avro_resolver_create(wschema, rschema);
+ save_resolver(resolvers, resolver);
+
avro_consumer_t **child_resolvers =
avro_calloc(num_branches, sizeof(avro_consumer_t *));
int some_branch_compatible = 0;
@@ -1105,7 +1226,8 @@ try_union(avro_schema_t wschema, avro_sc
* appear in the input.
*/
- child_resolvers[i] = avro_resolver_new(branch_schema, rschema);
+ child_resolvers[i] =
+ avro_resolver_new_memoized(resolvers, branch_schema,
rschema);
if (child_resolvers[i]) {
debug("Found match for writer union branch %u", i);
some_branch_compatible = 1;
@@ -1128,7 +1250,6 @@ try_union(avro_schema_t wschema, avro_sc
goto error;
}
- avro_resolver_t *resolver = avro_resolver_create(wschema, rschema);
resolver->num_children = num_branches;
resolver->child_resolvers = child_resolvers;
resolver->parent.union_branch = avro_resolver_union_branch;
@@ -1139,6 +1260,9 @@ error:
* Clean up any consumer we might have already created.
*/
+ delete_resolver(resolvers, resolver);
+ avro_consumer_free(&resolver->parent);
+
for (i = 0; i < num_branches; i++) {
if (child_resolvers[i]) {
avro_consumer_free(child_resolvers[i]);
@@ -1154,68 +1278,89 @@ error:
* schema type dispatcher
*/
-avro_consumer_t *
-avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+static avro_consumer_t *
+avro_resolver_new_memoized(st_table *resolvers,
+ avro_schema_t wschema, avro_schema_t rschema)
{
check_param(NULL, is_avro_schema(wschema), "writer schema");
check_param(NULL, is_avro_schema(rschema), "reader schema");
+ skip_links(wschema);
+ skip_links(rschema);
+
+ /*
+ * First see if we've already matched these two schemas. If so,
+ * just return that resolver.
+ */
+
+ avro_resolver_t *saved = find_resolver(resolvers, wschema, rschema);
+ if (saved) {
+ debug("Already resolved %s and %s",
+ avro_schema_type_name(wschema),
+ avro_schema_type_name(rschema));
+ return &saved->parent;
+ }
+
+ /*
+ * Otherwise we have some work to do.
+ */
+
switch (avro_typeof(wschema))
{
case AVRO_BOOLEAN:
- check_simple_writer(wschema, rschema, boolean);
+ check_simple_writer(resolvers, wschema, rschema,
boolean);
return NULL;
case AVRO_BYTES:
- check_simple_writer(wschema, rschema, bytes);
+ check_simple_writer(resolvers, wschema, rschema, bytes);
return NULL;
case AVRO_DOUBLE:
- check_simple_writer(wschema, rschema, double);
+ check_simple_writer(resolvers, wschema, rschema,
double);
return NULL;
case AVRO_FLOAT:
- check_simple_writer(wschema, rschema, float);
+ check_simple_writer(resolvers, wschema, rschema, float);
return NULL;
case AVRO_INT32:
- check_simple_writer(wschema, rschema, int);
+ check_simple_writer(resolvers, wschema, rschema, int);
return NULL;
case AVRO_INT64:
- check_simple_writer(wschema, rschema, long);
+ check_simple_writer(resolvers, wschema, rschema, long);
return NULL;
case AVRO_NULL:
- check_simple_writer(wschema, rschema, null);
+ check_simple_writer(resolvers, wschema, rschema, null);
return NULL;
case AVRO_STRING:
- check_simple_writer(wschema, rschema, string);
+ check_simple_writer(resolvers, wschema, rschema,
string);
return NULL;
case AVRO_ARRAY:
- check_simple_writer(wschema, rschema, array);
+ check_simple_writer(resolvers, wschema, rschema, array);
return NULL;
case AVRO_ENUM:
- check_simple_writer(wschema, rschema, enum);
+ check_simple_writer(resolvers, wschema, rschema, enum);
return NULL;
case AVRO_FIXED:
- check_simple_writer(wschema, rschema, fixed);
+ check_simple_writer(resolvers, wschema, rschema, fixed);
return NULL;
case AVRO_MAP:
- check_simple_writer(wschema, rschema, map);
+ check_simple_writer(resolvers, wschema, rschema, map);
return NULL;
case AVRO_RECORD:
- check_simple_writer(wschema, rschema, record);
+ check_simple_writer(resolvers, wschema, rschema,
record);
return NULL;
case AVRO_UNION:
- return try_union(wschema, rschema);
+ return try_union(resolvers, wschema, rschema);
default:
avro_set_error("Unknown schema type");
@@ -1224,3 +1369,14 @@ avro_resolver_new(avro_schema_t wschema,
return NULL;
}
+
+
+avro_consumer_t *
+avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+{
+ st_table *resolvers = st_init_table(&avro_resolver_hash_type);
+ avro_consumer_t *result =
+ avro_resolver_new_memoized(resolvers, wschema, rschema);
+ st_free_table(resolvers);
+ return result;
+}
Modified: avro/trunk/lang/c/src/schema.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/schema.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/schema.c (original)
+++ avro/trunk/lang/c/src/schema.c Fri Feb 25 16:22:55 2011
@@ -695,6 +695,15 @@ avro_schema_t avro_schema_link(avro_sche
return &link->obj;
}
+avro_schema_t avro_schema_link_target(avro_schema_t schema)
+{
+ check_param(NULL, is_avro_schema(schema), "schema");
+ check_param(NULL, is_avro_link(schema), "schema");
+
+ struct avro_link_schema_t *link = avro_schema_to_link(schema);
+ return link->to;
+}
+
static int
avro_type_from_json_t(json_t * json, avro_type_t * type,
avro_schema_error_t * error, avro_schema_t * named_type)
@@ -1320,37 +1329,40 @@ const char *avro_schema_name(const avro_
const char *avro_schema_type_name(const avro_schema_t schema)
{
- if (is_avro_record(schema)) {
- return (avro_schema_to_record(schema))->name;
- } else if (is_avro_enum(schema)) {
- return (avro_schema_to_enum(schema))->name;
- } else if (is_avro_fixed(schema)) {
- return (avro_schema_to_fixed(schema))->name;
- } else if (is_avro_union(schema)) {
- return "union";
- } else if (is_avro_array(schema)) {
- return "array";
- } else if (is_avro_map(schema)) {
- return "map";
- } else if (is_avro_int32(schema)) {
- return "int";
- } else if (is_avro_int64(schema)) {
- return "long";
- } else if (is_avro_float(schema)) {
- return "float";
- } else if (is_avro_double(schema)) {
- return "double";
- } else if (is_avro_boolean(schema)) {
- return "boolean";
- } else if (is_avro_null(schema)) {
- return "null";
- } else if (is_avro_string(schema)) {
- return "string";
- } else if (is_avro_bytes(schema)) {
- return "bytes";
- }
- avro_set_error("Unknown schema type");
- return NULL;
+ if (is_avro_record(schema)) {
+ return (avro_schema_to_record(schema))->name;
+ } else if (is_avro_enum(schema)) {
+ return (avro_schema_to_enum(schema))->name;
+ } else if (is_avro_fixed(schema)) {
+ return (avro_schema_to_fixed(schema))->name;
+ } else if (is_avro_union(schema)) {
+ return "union";
+ } else if (is_avro_array(schema)) {
+ return "array";
+ } else if (is_avro_map(schema)) {
+ return "map";
+ } else if (is_avro_int32(schema)) {
+ return "int";
+ } else if (is_avro_int64(schema)) {
+ return "long";
+ } else if (is_avro_float(schema)) {
+ return "float";
+ } else if (is_avro_double(schema)) {
+ return "double";
+ } else if (is_avro_boolean(schema)) {
+ return "boolean";
+ } else if (is_avro_null(schema)) {
+ return "null";
+ } else if (is_avro_string(schema)) {
+ return "string";
+ } else if (is_avro_bytes(schema)) {
+ return "bytes";
+ } else if (is_avro_link(schema)) {
+ avro_schema_t target = avro_schema_link_target(schema);
+ return avro_schema_type_name(target);
+ }
+ avro_set_error("Unknown schema type");
+ return NULL;
}
avro_datum_t avro_datum_from_schema(const avro_schema_t schema)
Modified: avro/trunk/lang/c/tests/test_avro_data.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/tests/test_avro_data.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/tests/test_avro_data.c (original)
+++ avro/trunk/lang/c/tests/test_avro_data.c Fri Feb 25 16:22:55 2011
@@ -382,6 +382,48 @@ static int test_record(void)
return 0;
}
+static int test_nested_record(void)
+{
+ const char *json =
+ "{"
+ " \"type\": \"record\","
+ " \"name\": \"list\","
+ " \"fields\": ["
+ " { \"name\": \"x\", \"type\": \"int\" },"
+ " { \"name\": \"y\", \"type\": \"int\" },"
+ " { \"name\": \"next\", \"type\": [\"null\",\"list\"]}"
+ " ]"
+ "}";
+
+ int rval;
+
+ avro_schema_t schema = NULL;
+ avro_schema_error_t error;
+ avro_schema_from_json(json, strlen(json), &schema, &error);
+
+ avro_datum_t head = avro_datum_from_schema(schema);
+ avro_record_set_field_value(rval, head, int32, "x", 10);
+ avro_record_set_field_value(rval, head, int32, "y", 10);
+
+ avro_datum_t next = NULL;
+ avro_datum_t tail = NULL;
+
+ avro_record_get(head, "next", &next);
+ avro_union_set_discriminant(next, 1, &tail);
+ avro_record_set_field_value(rval, tail, int32, "x", 20);
+ avro_record_set_field_value(rval, tail, int32, "y", 20);
+
+ avro_record_get(tail, "next", &next);
+ avro_union_set_discriminant(next, 0, NULL);
+
+ write_read_check(schema, head, NULL, NULL, "nested record");
+
+ avro_schema_decref(schema);
+ avro_datum_decref(head);
+
+ return 0;
+}
+
static int test_enum(void)
{
enum avro_languages {
@@ -604,6 +646,7 @@ int main(void)
"boolean", test_boolean}, {
"null", test_null}, {
"record", test_record}, {
+ "nested_record", test_nested_record}, {
"enum", test_enum}, {
"array", test_array}, {
"map", test_map}, {