[ https://issues.apache.org/jira/browse/METRON-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956906#comment-15956906 ]
ASF GitHub Bot commented on METRON-822: --------------------------------------- Github user cestella commented on a diff in the pull request: https://github.com/apache/incubator-metron/pull/509#discussion_r109919377 --- Diff: metron-sensors/fastcapa/src/kafka.c --- @@ -21,11 +21,113 @@ #define POLL_TIMEOUT_MS 1000 /* - * data structures required for the kafka client + * Passed to all callback functions to help identify the connection. */ -static rd_kafka_t** kaf_h; -static rd_kafka_topic_t** kaf_top_h; -static int num_conns; +struct opaque { + int conn_id; +}; + +/* + * Data structures required for the kafka client + */ +static rd_kafka_t **kaf_h; +static rd_kafka_topic_t **kaf_top_h; +static unsigned num_conns; +static FILE *stats_fd; +static struct app_stats *kaf_conn_stats; +static struct opaque *kaf_opaque; +static uint64_t *kaf_keys; + +/* + * A callback executed when an error occurs within the kafka client + */ +static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, void* UNUSED(opaque)) +{ + LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s [%s] \n", + rd_kafka_name(rk), rd_kafka_err2str(err), reason); +} + +/* + * A callback executed when a broker throttles the producer + */ +static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void* UNUSED(opaque)) +{ + LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms broker=%s broker_id=%"PRId32" \n", + rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id); +} + +/* + * A callback executed on a fixed frequency (defined by `statistics.interval.ms`) + * that provides detailed performance statistics + */ +static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), void *opaque) +{ + int rc; + struct opaque *data = (struct opaque*) opaque; + int conn_id = data->conn_id; + + // update queue depth of this kafka connection + kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk); + + // TODO this should be handled by a logging lib that can handle faults and rolling the output file --- End diff -- Can we have a JIRA created on this? > Improve Fastcapa Performance > ---------------------------- > > Key: METRON-822 > URL: https://issues.apache.org/jira/browse/METRON-822 > Project: Metron > Issue Type: Improvement > Reporter: Nick Allen > Assignee: Nick Allen > > Improve the performance and scalability of the Fastcapa probe. -- This message was sent by Atlassian JIRA (v6.3.15#6346)