This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 1faa589b18b6962c8750c55e4d9b5c4c08ce3353
Author: Andrew Stitcher <astitc...@apache.org>
AuthorDate: Wed Jun 23 14:53:04 2021 -0400

    PROTON-2451: Add code to emit amqp values
    
    Including optimizations to generate LIST8/MAP8 for short lists and maps;
    produce the list0 shortcode for empty lists; and omit nulls at the end
    of described lists (This has a fairly large effect on the size of message
    encoded in the transfer performatives.)
---
 c/src/core/emitters.h | 528 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 528 insertions(+)

diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h
new file mode 100644
index 0000000..b1a6805
--- /dev/null
+++ b/c/src/core/emitters.h
@@ -0,0 +1,528 @@
+#ifndef PROTON_EMITTERS_H
+#define PROTON_EMITTERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Definitions of AMQP type codes */
+#include "encodings.h"
+#include "buffer.h"
+
+#include <proton/codec.h>
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <string.h>
+
+typedef struct current {
+  struct current* previous_compound;
+  size_t size_position;
+  size_t start_position;
+  size_t count;
+  uint32_t null_count;
+  uint8_t type;
+  bool encoding_succeeded;
+  bool is_described_list;
+} pni_compound_context;
+
+static inline pni_compound_context make_compound(void) {
+  return (pni_compound_context){
+    .count = 0
+  };
+}
+
+typedef struct pni_emitter_t {
+  char* output_start;
+  size_t size;
+  size_t position;
+} pni_emitter_t;
+
+static inline pni_emitter_t make_emitter_from_buffer(pn_buffer_t* buffer) {
+  pn_rwbytes_t output_bytes = pn_buffer_free_memory(buffer);
+  return (pni_emitter_t){
+    .output_start = output_bytes.start,
+    .size = output_bytes.size,
+    .position = 0
+  };
+}
+
+static inline pni_emitter_t make_emitter_from_bytes(pn_rwbytes_t output_bytes) 
{
+  return (pni_emitter_t){
+    .output_start = output_bytes.start,
+    .size = output_bytes.size,
+    .position = 0
+  };
+}
+
+static inline pn_bytes_t make_bytes_from_emitter(pni_emitter_t emitter) {
+    return (pn_bytes_t){.size = emitter.position, .start = 
emitter.output_start};
+}
+
+static inline bool resize_required(pni_emitter_t* emitter) {
+  return emitter->position > emitter->size;
+}
+
+static inline void size_buffer_to_emitter(pn_buffer_t* buffer, pni_emitter_t* 
emitter) {
+  pn_buffer_ensure(buffer, 
pn_buffer_capacity(buffer)+(emitter->position-emitter->size));
+}
+
+static inline bool encode_succeeded(pni_emitter_t* emitter, 
pni_compound_context* compound) {
+  return compound->encoding_succeeded;
+}
+
+static inline bool pni_emitter_remaining(pni_emitter_t* e, size_t need) {
+  return (e->size >= e->position+need);
+}
+
+static inline void pni_emitter_writef8(pni_emitter_t* emitter, uint8_t value)
+{
+  if (pni_emitter_remaining(emitter, 1)) {
+    emitter->output_start[emitter->position+0] = value;
+  }
+  emitter->position++;
+}
+
+static inline void pni_emitter_writef16(pni_emitter_t* emitter, uint16_t value)
+{
+  if (pni_emitter_remaining(emitter, 2)) {
+    emitter->output_start[emitter->position+0] = 0xFF & (value >> 8);
+    emitter->output_start[emitter->position+1] = 0xFF & (value     );
+  }
+  emitter->position += 2;
+}
+
+static inline void pni_emitter_writef32(pni_emitter_t* emitter, uint32_t value)
+{
+  if (pni_emitter_remaining(emitter, 4)) {
+    emitter->output_start[emitter->position+0] = 0xFF & (value >> 24);
+    emitter->output_start[emitter->position+1] = 0xFF & (value >> 16);
+    emitter->output_start[emitter->position+2] = 0xFF & (value >>  8);
+    emitter->output_start[emitter->position+3] = 0xFF & (value      );
+  }
+  emitter->position += 4;
+}
+
+static inline void pni_emitter_writef64(pni_emitter_t* emitter, uint64_t 
value) {
+  if (pni_emitter_remaining(emitter, 8)) {
+    emitter->output_start[emitter->position+0] = 0xFF & (value >> 56);
+    emitter->output_start[emitter->position+1] = 0xFF & (value >> 48);
+    emitter->output_start[emitter->position+2] = 0xFF & (value >> 40);
+    emitter->output_start[emitter->position+3] = 0xFF & (value >> 32);
+    emitter->output_start[emitter->position+4] = 0xFF & (value >> 24);
+    emitter->output_start[emitter->position+5] = 0xFF & (value >> 16);
+    emitter->output_start[emitter->position+6] = 0xFF & (value >>  8);
+    emitter->output_start[emitter->position+7] = 0xFF & (value      );
+  }
+  emitter->position += 8;
+}
+
+static inline void pni_emitter_writef128(pni_emitter_t* emitter, void *value) {
+  if (pni_emitter_remaining(emitter, 16)) {
+    memcpy(emitter->output_start+emitter->position, value, 16);
+  }
+  emitter->position += 16;
+}
+
+static inline void pni_emitter_writev8(pni_emitter_t* emitter, const 
pn_bytes_t value)
+{
+  pni_emitter_writef8(emitter, value.size);
+  if (pni_emitter_remaining(emitter, value.size))
+    memcpy(emitter->output_start+emitter->position, value.start, value.size);
+  emitter->position += value.size;
+}
+
+static inline void pni_emitter_writev32(pni_emitter_t* emitter, const 
pn_bytes_t value)
+{
+  pni_emitter_writef32(emitter, value.size);
+  if (pni_emitter_remaining(emitter, value.size))
+    memcpy(emitter->output_start+emitter->position, value.start, value.size);
+  emitter->position += value.size;
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////////////
+
+static inline void emit_null(pni_emitter_t* emitter, pni_compound_context* 
compound) {
+  if (compound->is_described_list) {
+    compound->null_count++;
+    return;
+  }
+  pni_emitter_writef8(emitter, PNE_NULL);
+  compound->count++;
+}
+
+static inline void emit_accumulated_nulls(pni_emitter_t* emitter, 
pni_compound_context* compound) {
+  for (uint32_t i=compound->null_count; i>0; --i) {
+    pni_emitter_writef8(emitter, PNE_NULL);
+    compound->count++;
+  }
+  compound->null_count = 0;
+}
+
+static inline void emit_bool(pni_emitter_t* emitter, pni_compound_context* 
compound, bool b) {
+  emit_accumulated_nulls(emitter, compound);
+  if (b) {
+    pni_emitter_writef8(emitter, PNE_TRUE);
+  } else {
+    pni_emitter_writef8(emitter, PNE_FALSE);
+  }
+  compound->count++;
+}
+
+static inline void emit_ubyte(pni_emitter_t* emitter, pni_compound_context* 
compound, uint8_t ubyte) {
+  emit_accumulated_nulls(emitter, compound);
+  pni_emitter_writef8(emitter, PNE_UBYTE);
+  pni_emitter_writef8(emitter, ubyte);
+  compound->count++;
+}
+
+static inline void emit_ushort(pni_emitter_t* emitter, pni_compound_context* 
compound, uint16_t ushort) {
+  emit_accumulated_nulls(emitter, compound);
+  pni_emitter_writef8(emitter, PNE_USHORT);
+  pni_emitter_writef16(emitter, ushort);
+  compound->count++;
+}
+
+static inline void emit_uint(pni_emitter_t* emitter, pni_compound_context* 
compound, uint32_t uint) {
+  emit_accumulated_nulls(emitter, compound);
+  if (uint == 0) {
+    pni_emitter_writef8(emitter, PNE_UINT0);
+  } else if (uint < 256) {
+    pni_emitter_writef8(emitter, PNE_SMALLUINT);
+    pni_emitter_writef8(emitter, uint);
+  } else {
+    pni_emitter_writef8(emitter, PNE_UINT);
+    pni_emitter_writef32(emitter, uint);
+  }
+  compound->count++;
+}
+
+static inline void emit_ulong(pni_emitter_t* emitter, pni_compound_context* 
compound, uint64_t ulong) {
+  emit_accumulated_nulls(emitter, compound);
+  if (ulong == 0) {
+    pni_emitter_writef8(emitter, PNE_ULONG0);
+  } else if (ulong < 256) {
+    pni_emitter_writef8(emitter, PNE_SMALLULONG);
+    pni_emitter_writef8(emitter, ulong);
+  } else {
+    pni_emitter_writef8(emitter, PNE_ULONG);
+    pni_emitter_writef64(emitter, ulong);
+  }
+  compound->count++;
+}
+
+static inline void emit_timestamp(pni_emitter_t* emitter, 
pni_compound_context* compound, pn_timestamp_t timestamp) {
+  emit_accumulated_nulls(emitter, compound);
+  pni_emitter_writef8(emitter, PNE_MS64);
+  pni_emitter_writef64(emitter, timestamp);
+  compound->count++;
+}
+
+static inline void emit_descriptor(pni_emitter_t* emitter, 
pni_compound_context* compound, uint64_t ulong) {
+  emit_accumulated_nulls(emitter, compound);
+  pni_emitter_writef8(emitter, PNE_DESCRIPTOR);
+  if (ulong < 256) {
+    pni_emitter_writef8(emitter, PNE_SMALLULONG);
+    pni_emitter_writef8(emitter, ulong);
+  } else {
+    pni_emitter_writef8(emitter, PNE_ULONG);
+    pni_emitter_writef64(emitter, ulong);
+  }
+}
+
+static inline pni_compound_context emit_list(pni_emitter_t* emitter, 
pni_compound_context* compound, bool small_encoding, bool is_described_list) {
+  emit_accumulated_nulls(emitter, compound);
+  if (small_encoding) {
+    pni_emitter_writef8(emitter, PNE_LIST8);
+    // Need to fill in size and count later
+    size_t size_position = emitter->position;
+    pni_emitter_writef8(emitter, 0);
+    size_t start_position = emitter->position;
+    pni_emitter_writef8(emitter, 0);
+    return (pni_compound_context){
+      .previous_compound = compound,
+      .size_position = size_position,
+      .start_position = start_position,
+      .count = 0,
+      .null_count = 0,
+      .is_described_list = is_described_list,
+    };
+  } else {
+    pni_emitter_writef8(emitter, PNE_LIST32);
+    // Need to fill in size and count later
+    size_t size_position = emitter->position;
+    pni_emitter_writef32(emitter, 0);
+    size_t start_position = emitter->position;
+    pni_emitter_writef32(emitter, 0);
+    return (pni_compound_context){
+      .previous_compound = compound,
+      .size_position = size_position,
+      .start_position = start_position,
+      .count = 0,
+      .null_count = 0,
+      .is_described_list = is_described_list,
+    };
+  }
+}
+
+static inline void emit_end_list(pni_emitter_t* emitter, pni_compound_context* 
compound, bool small_encoding) {
+  // Check if list was 0 length
+  if (compound->count==0) {
+    emitter->position = compound->size_position - 1;
+    pni_emitter_writef8(emitter, PNE_LIST0);
+    compound->previous_compound->count++;
+    compound->encoding_succeeded = true;
+    return;
+  }
+  // Fill in size and count
+  size_t current = emitter->position;
+  emitter->position = compound->size_position;
+  size_t size = current-compound->start_position;
+  if (small_encoding) {
+    if (size>255 || compound->count>255) {
+      compound->encoding_succeeded = false;
+      emitter->position -= 1;
+      return;
+    }
+    pni_emitter_writef8(emitter, size);
+    pni_emitter_writef8(emitter, compound->count);
+  } else {
+    pni_emitter_writef32(emitter, size);
+    pni_emitter_writef32(emitter, compound->count);
+  }
+  emitter->position = current;
+  compound->previous_compound->count++;
+  compound->encoding_succeeded = true;
+}
+
+static inline pni_compound_context emit_array(pni_emitter_t* emitter, 
pni_compound_context* compound, bool small_encoding, pn_type_t type) {
+  emit_accumulated_nulls(emitter, compound);
+  if (small_encoding) {
+    pni_emitter_writef8(emitter, PNE_ARRAY8);
+    // Need to fill in size, count and type later
+    size_t size_position = emitter->position;
+    pni_emitter_writef8(emitter, 0);
+    size_t start_position = emitter->position;
+    pni_emitter_writef8(emitter, 0);
+    pni_emitter_writef8(emitter, 0);
+    return (pni_compound_context){
+      .previous_compound = compound,
+      .size_position = size_position,
+      .start_position = start_position,
+      .count = 0,
+    };
+  } else {
+    pni_emitter_writef8(emitter, PNE_ARRAY32);
+    // Need to fill in size, count and type later
+    size_t size_position = emitter->position;
+    pni_emitter_writef32(emitter, 0);
+    size_t start_position = emitter->position;
+    pni_emitter_writef32(emitter, 0);
+    pni_emitter_writef8(emitter, 0);
+    return (pni_compound_context){
+      .previous_compound = compound,
+      .size_position = size_position,
+      .start_position = start_position,
+      .count = 0,
+    };
+  }
+}
+
+static inline void emit_end_array(pni_emitter_t* emitter, 
pni_compound_context* compound, bool small_encoding) {
+  // Fill in size, count and type
+  size_t current = emitter->position;
+  emitter->position = compound->size_position;
+  size_t size = current-compound->start_position;
+  if (small_encoding) {
+    if (size>255 || compound->count>255) {
+      compound->encoding_succeeded = false;
+      emitter->position -= 1;
+      return;
+    }
+    pni_emitter_writef8(emitter, current-compound->start_position);
+    pni_emitter_writef8(emitter, compound->count);
+    pni_emitter_writef8(emitter, compound->type);
+  } else {
+    pni_emitter_writef32(emitter, current-compound->start_position);
+    pni_emitter_writef32(emitter, compound->count);
+    pni_emitter_writef8(emitter, compound->type);
+  }
+  emitter->position = current;
+  compound->previous_compound->count++;
+  compound->encoding_succeeded = true;
+}
+
+static inline void emit_binary_bytes(pni_emitter_t* emitter, 
pni_compound_context* compound, pn_bytes_t bytes) {
+  emit_accumulated_nulls(emitter, compound);
+  if (bytes.size < 256) {
+    pni_emitter_writef8(emitter, PNE_VBIN8);
+    pni_emitter_writev8(emitter, bytes);
+  } else {
+    pni_emitter_writef8(emitter, PNE_VBIN32);
+    pni_emitter_writev32(emitter, bytes);
+  }
+  compound->count++;
+}
+
+static inline void emit_string_bytes(pni_emitter_t* emitter, 
pni_compound_context* compound, pn_bytes_t bytes) {
+  emit_accumulated_nulls(emitter, compound);
+  if (bytes.size < 256) {
+    pni_emitter_writef8(emitter, PNE_STR8_UTF8);
+    pni_emitter_writev8(emitter, bytes);
+  } else {
+    pni_emitter_writef8(emitter, PNE_STR32_UTF8);
+    pni_emitter_writev32(emitter, bytes);
+  }
+  compound->count++;
+}
+
+static inline void emit_symbol_bytes(pni_emitter_t* emitter, 
pni_compound_context* compound, pn_bytes_t bytes) {
+  emit_accumulated_nulls(emitter, compound);
+  if (bytes.size < 256) {
+    pni_emitter_writef8(emitter, PNE_SYM8);
+    pni_emitter_writev8(emitter, bytes);
+  } else {
+    pni_emitter_writef8(emitter, PNE_SYM32);
+    pni_emitter_writev32(emitter, bytes);
+  }
+  compound->count++;
+}
+
+static inline void emit_symbol(pni_emitter_t* emitter, pni_compound_context* 
compound, const char* symbol) {
+  // FIXME: Yuck we need to use strlen to find the end of the string - would 
be better to take pn_bytes_t
+  if (symbol == NULL) {
+    emit_null(emitter, compound);
+  } else {
+    size_t size = strlen(symbol);
+    emit_symbol_bytes(emitter, compound, (pn_bytes_t){.size = size, .start = 
symbol});
+  }
+}
+
+static inline void emit_string(pni_emitter_t* emitter, pni_compound_context* 
compound, const char* string) {
+  // FIXME: Yuck we need to use strlen to find the end of the string - would 
be better to take pn_bytes_t
+  if (string == NULL) {
+    emit_null(emitter, compound);
+  } else {
+    size_t size = strlen(string);
+    emit_string_bytes(emitter, compound, (pn_bytes_t){.size = size, .start = 
string});
+  }
+}
+
+static inline void emit_binarynonull(pni_emitter_t* emitter, 
pni_compound_context* compound, size_t size, const char* bytes) {
+  emit_binary_bytes(emitter, compound, (pn_bytes_t){.size = size, .start = 
bytes});
+}
+
+static inline void emit_binaryornull(pni_emitter_t* emitter, 
pni_compound_context* compound, size_t size, const char* bytes) {
+  if (bytes == NULL) {
+    emit_null(emitter, compound);
+  } else {
+    emit_binary_bytes(emitter, compound, (pn_bytes_t){.size = size, .start = 
bytes});
+  }
+}
+
+// NB: This function is only correct because it currently can only be called 
to fill out an array
+static inline void emit_counted_symbols(pni_emitter_t* emitter, 
pni_compound_context* compound, size_t count, char** symbols) {
+  // 64 is a heuristic - 64 3 character symbols will already be 256 bytes
+  if (count>64){
+    compound->type = PNE_SYM32;
+    for (size_t i=0; i<count; ++i) {
+      size_t size = strlen(symbols[i]);
+      pni_emitter_writev32(emitter, (pn_bytes_t){.size = size, .start = 
symbols[i]});
+    }
+    compound->count+=count;
+    return;
+  }
+
+  pn_bytes_t bsymbols[64];
+  size_t total_bytes = 0;
+  for (size_t i=0; i<count; ++i) {
+    // FIXME: Yuck we need to use strlen to find the end of the strings - fix 
this by changing the signature to take pn_bytes_t[]
+    size_t size = strlen(symbols[i]);
+    bsymbols[i] = (pn_bytes_t){.size = size, .start = symbols[i]};
+    total_bytes += size+1; // Assuming PNE_SYM8 encoding
+  }
+  if (total_bytes<256) {
+    compound->type = PNE_SYM8;
+    for (size_t i=0; i<count; ++i) {
+      pni_emitter_writev8(emitter, bsymbols[i]);
+    }
+  } else {
+    compound->type = PNE_SYM32;
+    for (size_t i=0; i<count; ++i) {
+      pni_emitter_writev32(emitter, bsymbols[i]);
+    }
+  }
+  compound->count+=count;
+}
+
+static inline void pni_emitter_data(pni_emitter_t* emitter, pn_data_t* data) {
+  ssize_t data_size = 0;
+  if (emitter->position >= emitter->size ||
+      PN_OVERFLOW == (data_size = pn_data_encode(data, 
emitter->output_start+emitter->position, emitter->size-emitter->position))) {
+    emitter->position += pn_data_encoded_size(data);
+  } else {
+    emitter->position += data_size;
+  }
+}
+
+static inline void emit_copy(pni_emitter_t* emitter, pni_compound_context* 
compound, pn_data_t* data) {
+  emit_accumulated_nulls(emitter, compound);
+  if (!data || pn_data_size(data) == 0) {
+    pni_emitter_writef8(emitter, PNE_NULL);
+  } else {
+    pni_emitter_data(emitter, data);
+  }
+  compound->count++;
+}
+
+static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* 
compound, pn_data_t* data) {
+  emit_accumulated_nulls(emitter, compound);
+  if (!data || pn_data_size(data) == 0) {
+    pni_emitter_writef8(emitter, PNE_NULL);
+  } else if (pn_data_type(data) == PN_ARRAY) {
+    switch (pn_data_get_array(data)) {
+      case 0:
+        pni_emitter_writef8(emitter, PNE_NULL);
+        break;
+      case 1:
+        pn_data_enter(data);
+        pn_data_narrow(data);
+        pni_emitter_data(emitter, data);
+        pn_data_widen(data);
+        break;
+      default:
+        pni_emitter_data(emitter, data);
+    }
+  } else {
+    pni_emitter_data(emitter, data);
+  }
+  compound->count++;
+}
+
+static inline void emit_described_type_copy(pni_emitter_t* emitter, 
pni_compound_context* compound, uint64_t descriptor, pn_data_t* data) {
+  emit_descriptor(emitter, compound, descriptor);
+  pni_compound_context c = make_compound();
+  emit_copy(emitter, &c, data);
+  // Can only be a single item (probably a list though)
+  compound->count++;
+}
+
+#endif // PROTON_EMITTERS_H

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to