Hi, In my output plugin (see attached code), because I need to process and send the messages to some other daemon, I created a new pthread to listen/read response from that daemon. All the code works well when I ran by starting rsyslog in manual mode.
/usr/sbin/rsyslogd -f/etc/rsyslog.conf -u2 -n -irsyslog.pid -M/usr/lib/rsyslog However, when I ran my plugin with rsyslogd as service, it seems that my new thread is started but it is not running properly. I saw the thread print out a startup msg but it didn't continue printing new msg as expected. What's wrong with my code? is there limitation on using new pthread? Liwei
/** omazuremds.c
This is the output module feeding directly to mdsd. Its inputs are rsyslog messages.
It will send outputs, which are an array of json object strings to the mdsd agent.
If any network failure occurs, including mdsd not working, data will be resent to mdsd.
It uses batching for the input message processing. Each batch will be sent to mdsd agent in
one send() call. After send(), it will read response from mdsd to validate whether proper data
are received. If unexpected response is received, it will report error and resend the batch.
Required configuration parameters:
- queue.dequeuebatchsize: this defines the batch size. It is defined in rsyslog conf file.
- mdsdport: this defines the mdsd agent port. It is defined in rsyslog conf file.
- others: see the related rsyslog conf file for other parameters related to performance and reliability.
*/
#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <unistd.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <poll.h>
#include "hashtable.h"
#include "hashtable_private.h"
#include <ctype.h>
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omazuremds") /* define the module name */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
/* internal structures
*/
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
const int INVALID_SOCKET = -1; /* invalid socket value */
/* structure defining a batch of items to be
sent to the mdsd in one send(). */
struct mdsd_batch
{
/* number of items in dataList.*/
int n;
/* An array of JSON object strings */
char** dataList;
};
/* Each rsyslog process will have one instanceData instance.
Use a hashtable cache to store the tag/json objects:
Use a reader thread to read data from mdsd; if receiving any tag from mdsd, remove it from the hash.
If reader thread read fails, mark the read failure.
*/
typedef struct _instanceData {
int batchSize; /* queue.dequeuebatchsize; this is the batch size.*/
int mdsdPort; /* mdsd port number */
pthread_mutex_t mutBatch; /* lock for batch object changes. */
int mdsd_sock; /*socket to communicate with MDSD */
/* hash cache to store all json string. key=tag; value: json object string.*/
struct hashtable *cacheTable;
pthread_t readerThread; /* mdsd reader thread to read data from mdsd.*/
int readOK; /* 0: not OK for reader thread to read; 1: OK for reader thread to read. */
} instanceData;
/* Each rsyslog process may have N workerInstances, depending number of worker queues.*/
typedef struct wrkrInstanceData {
instanceData *pData;
struct mdsd_batch batch;
char tagBase[16]; /* to make tag to be unique, create a unique base string first.*/
} wrkrInstanceData_t;
/* parameters defined in rsyslog conf file.*/
static struct cnfparamdescr actpdescr[] = {
{ "template", eCmdHdlrGetWord, 1 },
{ "queue.dequeuebatchsize", eCmdHdlrGetWord, 1 },
{ "mdsdport", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk = {
CNFPARAMBLK_VERSION,
sizeof(actpdescr)/sizeof(struct cnfparamdescr),
actpdescr
};
/* (re)set config variables to default values */
BEGINinitConfVars
CODESTARTinitConfVars
resetConfigVariables(NULL, NULL);
ENDinitConfVars
int CreateReaderThread(instanceData* pData);
/* Resource allocation: create a new pData instance and initialize it.
* Each process has only 1 such an instance.
*/
BEGINcreateInstance
CODESTARTcreateInstance
ASSERT(pData != NULL);
pData->batchSize = 0;
pthread_mutex_init(&pData->mutBatch, NULL);
pData->mdsd_sock = INVALID_SOCKET;
pData->cacheTable = create_hashtable(pData->batchSize, hash_from_string, key_equals_string, NULL);
pData->readerThread = 0;
pData->readOK = 0;
if (CreateReaderThread(pData) > 0)
{
dbgprintf("omazuremds: createInstance error at CreateReaderThread\n");
iRet=RS_RET_ERR;
}
ENDcreateInstance
/* Defines worker instance. Each rsyslog process has only 1 instance (InstanceData).
* It may have multiple worker instances defined by multiple worker queues.
* The batch log data need to be tracked in worker instance.
*
* The macros define pWrkrData, where WrkrData->pData = pData.
*/
BEGINcreateWrkrInstance
int batchSize;
CODESTARTcreateWrkrInstance
pWrkrData->batch.n = 0;
batchSize = pData->batchSize;
pWrkrData->batch.dataList = (char**)malloc(batchSize * sizeof(char*));
snprintf(pWrkrData->tagBase, sizeof(pWrkrData->tagBase), "%d:", (int)time(0));
ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
iRet = RS_RET_OK;
ENDisCompatibleWithFeature
/* free any resource allocated in createInstance. */
BEGINfreeInstance
CODESTARTfreeInstance
pthread_mutex_destroy(&pData->mutBatch);
pData->batchSize = 0;
hashtable_destroy(pData->cacheTable, 1);
if (pData->readerThread > 0) {
pthread_cancel(pData->readerThread);
pData->readerThread = 0;
}
pData->readOK = 0;
ENDfreeInstance
/* Free the worker instance resources */
BEGINfreeWrkrInstance
CODESTARTfreeWrkrInstance
free(pWrkrData->batch.dataList);
pWrkrData->batch.dataList = NULL;
pWrkrData->batch.n = 0;
ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo
/* Return 1 if str is NULL/empty/whitespaces. Otherwise, return 0.
*/
int IsEmptyOrWhiteSpace(const char * str)
{
int isEmpty = 1;
size_t i;
if (str != NULL) {
for(i = 0; i < strlen(str); i++) {
if (!isspace(str[i])) {
isEmpty = 0;
break;
}
}
}
return isEmpty;
}
/* Create a new string that'll combine all strings in a given array, return the new string.
The new string memory needs to be freed by caller.
inputs:
- strArray: string array.
- nitems: number of items in string array.
return: new combined string. It must be freed by caller.
*/
char* CreateNewString(char** strArray, int nitems)
{
char* newStr = NULL;
int totalLen = 0;
int i = 0;
int strIndex=0;
size_t itemLen = 0;
if (strArray == NULL || nitems <= 0) {
return newStr;
}
for (i = 0; i < nitems; i++)
{
totalLen += strlen(strArray[i]);
}
newStr = (char*)malloc(totalLen+1);
for (i = 0; i < nitems; i++)
{
itemLen = strlen(strArray[i]);
memcpy(newStr+strIndex, strArray[i], itemLen);
strIndex += itemLen;
}
newStr[totalLen]='\0';
return newStr;
}
/* set up connection to MDSD agent socket.
* There are seveval socket failure cases:
* - cannot do socket()/connect(): close socket and re-create.
* - send data to mdsd fails: close and recreate socket.
* - read mdsd data fails: no need to close socket/recreate.
*
* Return: error code.
*/
rsRetVal SetupConnectionWithMdsd(instanceData* pData)
{
DEFiRet;
struct sockaddr_in addr;
int retries;
int ret;
char errorstr[256];
char* errRC;
const int NETWORK_RETRY=10;
assert(pData != NULL);
memset((char *) &addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(pData->mdsdPort);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
pthread_mutex_lock(&pData->mutBatch);
dbgprintf("omazuremds: setupConnectionWithMdsd socket: %d ...\n", pData->mdsd_sock);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: setupConnectionWithMdsd socket: %d ...\n", pData->mdsd_sock);
#endif
if(pData->mdsd_sock == INVALID_SOCKET) {
retries = 0;
while(1) {
dbgprintf("omazuremds: setupConnectionWithMdsd creating new socket fd=%d\n", pData->mdsd_sock);
if((pData->mdsd_sock=socket(AF_INET, SOCK_STREAM , 0))==-1) {
errRC = strerror_r(errno, errorstr, sizeof(errorstr));
errmsg.LogError(0, RS_RET_ERR, "omazuremds error at socket(). errno=%s\n", errRC);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
if((ret = connect(pData->mdsd_sock, (struct sockaddr*)&addr, sizeof(addr))) != 0) {
if(retries++ == NETWORK_RETRY) {
errRC = strerror_r(errno, errorstr, sizeof(errorstr));
errmsg.LogError(0, RS_RET_ERR, "omazuremds error at connect(). errno=%s\n", errRC);
ABORT_FINALIZE(RS_RET_SUSPENDED);
} else {
close(pData->mdsd_sock);
pData->mdsd_sock = INVALID_SOCKET;
usleep(100000); /* 100 ms = 100,000 us */
}
}
else {
break;
}
}
}
finalize_it:
if(iRet != RS_RET_OK) {
close(pData->mdsd_sock);
pData->mdsd_sock = INVALID_SOCKET;
}
dbgprintf("omazuremds: done with setupConnectionWithMdsd. iRet=%d\n", iRet);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE,"omazuremds: done with setupConnectionWithMdsd. iRet=%d\n", iRet);
#endif
pthread_mutex_unlock(&pData->mutBatch);
RETiRet;
}
/* Add k=tagStr, v=jsonStr to cache table.
* Multiple workers may call this. Need to be thread safe.
*/
int AddDataToCache(instanceData* pData, char* tagStr, char* jsonStr)
{
int nerrs = 0;
int rc = 0;
assert(pData != NULL);
assert(tagStr != NULL);
assert(jsonStr != NULL);
dbgprintf("omazuremds AddDataToCache tag='%s'\n", tagStr);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds AddDataToCache tag='%s'\n", tagStr);
#endif
pthread_mutex_lock(&pData->mutBatch);
rc = hashtable_insert(pData->cacheTable, tagStr, jsonStr);
pthread_mutex_unlock(&pData->mutBatch);
if (!rc)
{
errmsg.LogError(0, RS_RET_ERR, "omazuremds AddDataToCache error: tag='%s', value='%s'\n", tagStr, jsonStr);
nerrs++;
}
return nerrs;
}
/*
* Remove entry matching a given TAG from the cache table. Free the memory of the tag and JSON object.
*/
void RemoveDataFromCache(instanceData* pData, char* tagStr)
{
char* foundJson = NULL;
assert(pData != NULL);
assert(tagStr != NULL);
if (IsEmptyOrWhiteSpace(tagStr)) {
dbgprintf("omazuremds RemoveDataFromCache: error. unexpected tagStr: empty or white space\n");
return;
}
pthread_mutex_lock(&pData->mutBatch);
/* hashtable_remove() will free the found entry key*/
foundJson = hashtable_remove(pData->cacheTable, tagStr);
pthread_mutex_unlock(&pData->mutBatch);
if (foundJson)
{
free(foundJson);
foundJson = NULL;
}
else
{
/* For resent objects, they could be already removed. */
dbgprintf("omazuremds RemoveDataFromCache: warning: object not found: tag='%s'; value=NULL\n", tagStr);
}
}
/* Get number of items in the cache table. */
unsigned int GetCacheCount(instanceData* pData)
{
unsigned int count = 0;
assert(pData != NULL);
pthread_mutex_lock(&pData->mutBatch);
count = hashtable_count(pData->cacheTable);
pthread_mutex_unlock(&pData->mutBatch);
return count;
}
/*
* Parse mdsd response string to get the tag\n. For each tag, remove it from the
* cache and free the JSON object string memory.
*
* responseStr may contain incomplete data from mdsd. example: 'tag1\ntag2', where
* there is no \n at the end of tag2. The incomplete data tag2 won't be processed.
*
* Return: number of bytes in responseStr that are processed.
*/
int ProcessMdsdResponse(instanceData* pData, const char* responseStr)
{
const char delimiter = '\n';
char delimiters[2] = {delimiter, '\0'};
char *cp, *tag, *saveptr;
char *partialData;
size_t totalLen = 0;
size_t dataLen = 0;
assert(pData != NULL);
assert(responseStr != NULL);
totalLen = strlen(responseStr);
dataLen = totalLen;
if (responseStr[totalLen-1] != delimiter) {
partialData = strrchr(responseStr, delimiter);
if (partialData) {
dataLen = partialData - responseStr + 1;
}
else {
dataLen = 0;
}
}
cp = (char*)malloc(dataLen+1);
strncpy(cp, responseStr, dataLen);
cp[dataLen] = '\0';
tag = strtok_r(cp, delimiters, &saveptr);
if (tag != NULL)
{
RemoveDataFromCache(pData, tag);
while(1) {
tag = strtok_r(NULL, delimiters, &saveptr);
if (tag == NULL) {
break;
}
RemoveDataFromCache(pData, tag);
}
}
free(cp);
cp = NULL;
return dataLen;
}
void SetReadOK(instanceData *pData, int newValue)
{
assert(newValue == 1 || newValue == 0);
pthread_mutex_lock(&pData->mutBatch);
pData->readOK = newValue;
pthread_mutex_unlock(&pData->mutBatch);
}
int ResendCacheData(instanceData* pData);
/* Read data from mdsd and validate the results.
* If read() fails, return to caller. Otherwise, use a loop to continue reading from mdsd.
* Once data are read, they are processed assumeing format TAG1\nTAG2\n.... Partial data are
* string that doesn't end with '\n'. They are saved and will add up to next read().
*
* returns: number of errors.
*/
int ReadDataFromMdsdOnce(instanceData* pData)
{
int n;
int nerrs = 0;
char* responseBuf;
const int initBufLen = 1024;
int trigger = initBufLen/2;
int leftLen = initBufLen; /* left free space in buffer */
int bufLen = initBufLen;
responseBuf = (char*) malloc(bufLen+1);
int index = 0;
char errorstr[256];
char* errRC;
int nProcessed = 0;
int nNotProcessed = 0;
int resendTime = 30; /* in seconds. frequencey to do the resend*/
struct timespec prevClock;
struct timespec currClock;
assert(pData != NULL);
if (INVALID_SOCKET == pData->mdsd_sock) {
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce. error. sock=%d\n", pData->mdsd_sock);
#endif
dbgprintf("omazuremds: ReadDataFromMdsdOnce. error. sock=%d\n", pData->mdsd_sock);
nerrs++;
return nerrs;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &prevClock);
while(1)
{
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce start to read()\n");
#endif
n = read(pData->mdsd_sock, responseBuf+index, leftLen);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce read() return = %d\n", n);
#endif
if (n == 0) {
errRC=strerror_r(errno, errorstr, sizeof(errorstr));
dbgprintf("omazuremds: ReadDataFromMdsdOnce: read 0 bytes. errno=%s\n", errRC);
nerrs++;
}
else if (n == -1)
{
errRC=strerror_r(errno, errorstr, sizeof(errorstr));
errmsg.LogError(0, RS_RET_ERR, "omazuremds: ReadDataFromMdsdOnce: read() error. errno=%s\n", errRC);
nerrs++;
}
else {
leftLen -= n;
index += n;
responseBuf[bufLen-leftLen] = '\0';
dbgprintf("omazuremds: ReadDataFromMdsdOnce: responseBuf='%s'\n", responseBuf);
nProcessed = ProcessMdsdResponse(pData, responseBuf);
if (nProcessed > 0) {
/* handle partial TAG data from response */
nNotProcessed = bufLen-leftLen - nProcessed;
memmove(responseBuf, responseBuf+nProcessed, nNotProcessed+1); /* also move the '\0'.*/
leftLen += nProcessed;
index -= nProcessed;
}
if (leftLen <= trigger) {
leftLen += initBufLen;
bufLen += initBufLen;
responseBuf = (char*)realloc(responseBuf, bufLen+1);
}
}
/* check whether we need to do resend*/
clock_gettime(CLOCK_MONOTONIC_RAW, &currClock);
if ((int)(currClock.tv_sec - prevClock.tv_sec) >= resendTime) {
nerrs += ResendCacheData(pData);
clock_gettime(CLOCK_MONOTONIC_RAW, &prevClock);
if (nerrs > 0) {
break;
}
}
}
free(responseBuf);
responseBuf = NULL;
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce. nerrs=%d\n", nerrs);
#endif
dbgprintf("omazuremds: ReadDataFromMdsdOnce. nerrs=%d\n", nerrs);
return nerrs;
}
/* Use socket send() to send string to mdsd port. Return error and close the socket
* if send() fails.
*/
int SendDataToMdsd(instanceData* pData, char* jsonObjectListStr)
{
int nerrs = 0;
int sendRet = 0;
int jsonListLen = strlen(jsonObjectListStr);
assert(pData != NULL);
assert(jsonObjectListStr != NULL);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: start SendDataToMdsd: %s ...\n", jsonObjectListStr);
#endif
dbgprintf("omazuremds: start SendDataToMdsd: %s ...\n", jsonObjectListStr);
if (SetupConnectionWithMdsd(pData) != RS_RET_OK)
{
nerrs++;
}
else {
sendRet = send(pData->mdsd_sock, jsonObjectListStr, jsonListLen, 0);
if (sendRet != jsonListLen) {
errmsg.LogError(0, RS_RET_ERR, "omazuremds error at send() failed, sock=%d, ret=%d\n",
pData->mdsd_sock, sendRet);
nerrs++;
pthread_mutex_lock(&pData->mutBatch);
close(pData->mdsd_sock);
pData->mdsd_sock = INVALID_SOCKET;
pthread_mutex_unlock(&pData->mutBatch);
/* There must be something wrong with socket, so set ReadOK=0.*/
SetReadOK(pData, 0);
}
else {
/* data were sent; reader may start read */
SetReadOK(pData, 1);
}
}
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "omazuremds: SendDataToMdsd done. nerrs=%d\n", nerrs);
#endif
return nerrs;
}
/*
* Resend any data in the cache table to mdsd; report error if any.
* Return: number of errors.
*/
int ResendCacheData(instanceData* pData)
{
int nerrs = 0;
unsigned int count = 0;
char **strArray = NULL;
int index = 0;
char * jsonObjectListStr = NULL;
unsigned int j = 0;
int batchSize = pData->batchSize;
unsigned int i = 0;
struct entry * item = NULL;
assert(pData != NULL);
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "ResendCacheData start\n");
#endif
pthread_mutex_lock(&pData->mutBatch);
count = pData->cacheTable->entrycount;
if (count > 0) {
strArray = (char**)malloc(count * sizeof(char*));
for (i = 0; i < pData->cacheTable->tablelength; i++) {
item = pData->cacheTable->table[i];
while (NULL != item) {
strArray[index++] = (char*) item->v;
item = item->next;
}
}
}
pthread_mutex_unlock(&pData->mutBatch);
for (j = 0; j < (count / batchSize); j++) {
jsonObjectListStr = CreateNewString(strArray+j*batchSize, batchSize);
nerrs += SendDataToMdsd(pData, jsonObjectListStr);
free(jsonObjectListStr);
jsonObjectListStr = NULL;
}
if (count%batchSize > 0)
{
jsonObjectListStr = CreateNewString(strArray+j*batchSize, count - j*batchSize);
nerrs += SendDataToMdsd(pData, jsonObjectListStr);
free(jsonObjectListStr);
jsonObjectListStr = NULL;
}
free(strArray);
strArray = NULL;
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "ResendCacheData count=%d. nerrs=%d\n", count, nerrs);
#endif
return nerrs;
}
/*
* Read data from mdsd. It is called in a new thread.
*
* failure handling:
* - If cache is empty, which means no data are sent, mark readOk = 0;
* - If either resend failure or read fails, mark readOK = 0;
* - If there are previous failures and readOK = 1, cache may not be empty. Resend.
* - If readOK 0, wait until readOK=1 before read.
*/
void* ReadDataFromMdsd(void* vpData)
{
int nSendErrs = 0;
int nReadErrs = 0;
instanceData* pData = (instanceData*)vpData;
const int sleepMS = 100; /* 100 milli-seconds */
int count = 0;
const int MaxCount = 100;
assert(vpData != NULL);
while(1) {
/* Don't block this forever because if no msg is sent(), no way to reset ReadOK. */
count = 0;
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "ReadDataFromMdsd readOK=%d\n", pData->readOK);
#endif
while((0 == pData->readOK) && (count < MaxCount))
{
usleep(sleepMS*1000);
count++;
}
#ifdef DEBUGMDS
errmsg.LogError(0, RS_RET_NONE, "ReadDataFromMdsd readOK=%d. count=%d\n", pData->readOK, count);
#endif
nSendErrs = ResendCacheData(pData);
if (nSendErrs > 0) {
dbgprintf("omazuremds: ReadDataFromMdsd: resend error: nerrs=%d\n", nSendErrs);
continue;
}
nReadErrs = ReadDataFromMdsdOnce((instanceData*)pData);
if (nReadErrs > 0) {
dbgprintf("omazuremds: ReadDataFromMdsd: read error: nerrs=%d\n", nReadErrs);
SetReadOK(pData, 0);
}
else {
/* some data is read. so it is good to continue */
SetReadOK(pData, 1);
}
if (0 == GetCacheCount(pData)) {
/* no data to read */
SetReadOK(pData, 0);
dbgprintf("omazuremds: ReadDataFromMdsd: cachetable is empty\n");
}
}
pthread_exit(pData);
}
/* Create a new thread that'll listen on mdsd port and read data from it.
* Because data can come from mdsd at any time, we'll keep the thread alive.
*/
int CreateReaderThread(instanceData* pData)
{
int nerrs = 0;
pthread_attr_t attr;
int rc;
assert(pData != NULL);
if (pData->readerThread > 0)
{
dbgprintf("omazuremds: CreateReaderThread: already created: id=%ld\n", pData->readerThread);
return nerrs;
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize(&attr, 4096*1024);
rc = pthread_create(&pData->readerThread, &attr, ReadDataFromMdsd, (void *)pData);
if (rc)
{
nerrs++;
dbgprintf("omazuremds: CreateReaderThread error. error=%d\n", rc);
pData->readerThread = 0;
}
pthread_attr_destroy(&attr);
dbgprintf("omazuremds: CreateReaderThread done. nerrs=%d\n", nerrs);
return nerrs;
}
/* Send worker queue's batch to mdsd.
* Inputs:
* pWrkrData: worker instance.
* comment: some comment for logging purpose only.
* Returns: number of errors.
*/
int SendBatchData(wrkrInstanceData_t *pWrkrData, const char* comment)
{
instanceData *pData;
char* jsonObjectListStr;
int nerrs = 0;
assert(pWrkrData != NULL);
assert(comment != NULL);
if (pWrkrData->batch.n == 0) {
return nerrs;
}
pData = pWrkrData->pData;
dbgprintf("omazuremds: sendBatchData %s BatchSize=%d; #Items=%d\n",
comment, pData->batchSize, pWrkrData->batch.n);
jsonObjectListStr = CreateNewString(pWrkrData->batch.dataList, pWrkrData->batch.n);
nerrs += SendDataToMdsd(pData, jsonObjectListStr);
free(jsonObjectListStr);
jsonObjectListStr = NULL;
/* mark worker batch data to be empty */
pWrkrData->batch.n = 0;
dbgprintf("omazuremds: finished sendBatchData. nerrs=%d\n", nerrs);
return nerrs;
}
/* expected valid source format: "<source>". If <source> is empty, or not double-quoted, report error.
Return 1 if valid. 0 is invalid.
*/
int IsValidSourceFormat(const char* src)
{
int len = 0;
int i = 0;
int isValid = 1;
assert(src != NULL);
if (IsEmptyOrWhiteSpace(src)) {
dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: empty or white space\n");
return 0;
}
len = strlen(src);
if ('"' != src[0] || '"' != src[len-1]) {
dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: '%s'\n", src);
return 0;
}
for (i = 1; i < len-1; i++) {
if ('"' == src[i]) {
dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: '%s'\n", src);
isValid = 0;
break;
}
}
return isValid;
}
/* Create a unique TAG value given a string base.
* It is used in the JSON object, which must have unique TAG value.
* CLOCK_MONOTONIC_RAW will get clock time since computer reboots. To make tag really unique,
* add the tagBase, which is the Epoch time.
*
* Inputs:
* tagBase: some base string to contruct unique value.
*
* Returns: a unique string that can be used as TAG value.
*/
char * CreateUniqueTag(char * tagBase)
{
char *tagStr;
struct timespec clock;
assert(tagBase != NULL);
clock_gettime(CLOCK_MONOTONIC_RAW, &clock);
tagStr = (char*)malloc(strlen(tagBase)+32); /* allocate large enough, extra space for added numbers. */
sprintf(tagStr, "%s%d.%ld", tagBase, (int)clock.tv_sec, clock.tv_nsec);
return tagStr;
}
/* Create a json object given the message log string;
* The logString format is facility,<others>
* Facility will be used as the source.
* {"TAG":"<tag>", "SOURCE":"<source>", "DATA":["a","b"]}
*
* Input:
* tagStr: JSON object tag.
* logString: log string that'll be used in JSON's DATA part.
* Returns: a new JSON object string. Its memory needs to be freed by caller.
*/
char* CreateMdsdJson(char* tagStr, char* logString)
{
int i;
int sourceLen;
int totalLen;
char * source;
size_t jsonLen;
char *jsonString;
/* if no source found in logString, use default.*/
const char* defaultSrc = "\"local0\"";
const char jsonFormat[] = "{\"TAG\":\"%s\", \"SOURCE\":%s,\"DATA\":[%s]}";
/* need to handle logString without comma scenario. so make a new string for it.*/
char *logStringNew = NULL;
int useDefaultSrc = 0;
assert(tagStr != NULL);
assert(logString != NULL);
if (IsEmptyOrWhiteSpace(tagStr)) {
errmsg.LogError(0, RS_RET_ERR, "omazuremds: CreateMdsdJson error. unexpected tagStr value: empty or white space\n");
return NULL;
}
if (IsEmptyOrWhiteSpace(logString)) {
dbgprintf("omazuremds: CreateMdsdJson error: unexpected logString value: empty or white space\n");
return NULL;
}
sourceLen = 0;
totalLen = strlen(logString);
for (i = 0; i < totalLen; i++)
{
if (logString[i] == ',') {
sourceLen = i;
break;
}
}
if (sourceLen > 0) {
source = (char*)malloc(sourceLen+1);
strncpy(source, logString, sourceLen);
source[sourceLen] = '\0';
if (0 == IsValidSourceFormat(source)) {
return NULL;
}
}
else {
useDefaultSrc = 1;
sourceLen = strlen(defaultSrc);
source = (char*)malloc(sourceLen+1);
strncpy(source, defaultSrc, sourceLen);
source[sourceLen] = '\0';
logStringNew = (char*)malloc(strlen(logString) + sourceLen + 8);
sprintf(logStringNew, "%s,\"%s\"", source, logString);
}
/* give extra space to make sure we have enough space for the jason string */
jsonLen = strlen(tagStr) + sourceLen + totalLen + sizeof(jsonFormat) + 64;
jsonString = (char*)malloc(jsonLen * sizeof(char));
if (useDefaultSrc) {
sprintf(jsonString, jsonFormat, tagStr, source, logStringNew);
free(logStringNew);
logStringNew = NULL;
}
else {
sprintf(jsonString, jsonFormat, tagStr, source, logString);
}
free(source);
source = NULL;
return jsonString;
}
/* Check whether MDS agent can be connected to properly.
* May add ping msg checking in the future.
*/
rsRetVal InitMdsdConnection(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
CHKiRet(SetupConnectionWithMdsd(pWrkrData->pData));
finalize_it:
RETiRet;
}
/* If beginTransaction/doAction/endTransaction is suspended (RS_RET_SUSPENDED),
* resume will be called until it succeeds, or exit based on action configurations.
* After resume succeeds, beginTransaction will start.
*/
BEGINtryResume
CODESTARTtryResume
dbgprintf("omazuremds: resume action\n");
CHKiRet(InitMdsdConnection(pWrkrData));
finalize_it:
ENDtryResume
/* Begin a batch loop: beginTransaction/DoAction/endTransaction;
* If previous batch's send data fails, or something aborts before endTransaction,
* use beginTransaction to retry sending them. If the retry in beginTransaction fails,
* suspend the transaction.
*
*/
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omazuremds: beginTransaction starts.\n");
CHKiRet(InitMdsdConnection(pWrkrData));
finalize_it:
ENDbeginTransaction
/* This step handles the input message.
* Each message will be saved into a data structure. When all messages for a batch
* are saved, the messages will be sent to mdsd in endTransaction.
*
*/
BEGINdoAction
char *logString;
instanceData *pData;
char *tagStr;
char *jsonStr;
CODESTARTdoAction
if (ppString != NULL) {
pData = pWrkrData->pData;
logString = (char*)ppString[0];
ASSERT(logString != NULL);
dbgprintf("omazuremds: start doAction: msg='%s'\n", logString);
if (pWrkrData->batch.n >= pData->batchSize) {
errmsg.LogError(0, RS_RET_ERR, "omazuremds: unexpected error: out of space for batch. n=%d, batchSize=%d\n",
pWrkrData->batch.n, pData->batchSize);
}
else {
tagStr = CreateUniqueTag(pWrkrData->tagBase);
jsonStr = CreateMdsdJson(tagStr, logString);
if (jsonStr != NULL) {
pWrkrData->batch.dataList[pWrkrData->batch.n] = jsonStr;
pWrkrData->batch.n++;
if (AddDataToCache(pData, tagStr, jsonStr) > 0) {
iRet = RS_RET_SUSPENDED;
}
}
}
if (iRet == RS_RET_OK) {
iRet = RS_RET_DEFER_COMMIT;
}
}
dbgprintf("omazuremds: done doAction: iRet=%d\n", iRet);
ENDdoAction
/* This is called after a batch of messages are processed by doAction().
* It will send data to mdsd. If send() fails, suspend the operation until resume succeeds.
*/
BEGINendTransaction
int nerrs = 0;
CODESTARTendTransaction
nerrs += SendBatchData(pWrkrData, "endTransaction");
if (nerrs > 0) {
dbgprintf("omazuremds: endTransaction sendBatchData %d errors.\n", nerrs);
iRet = RS_RET_SUSPENDED;
}
ENDendTransaction
/* Defines actions that'll be called for each new instance.
* This function is called once only. It will get the rsyslog conf parameters
* and also create the instanceData object.
*/
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
char* batchSizeStr;
char* mdsdPortStr;
char* tplName;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
CHKiRet(createInstance(&pData));
CODE_STD_STRING_REQUESTparseSelectorAct(1)
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
if(strcmp(actpblk.descr[i].name, "template") == 0) {
tplName = es_str2cstr(pvals[i].val.d.estr, NULL);
dbgprintf("omazuremds: newact templateName = '%s'\n", tplName);
}
else if (strcmp(actpblk.descr[i].name, "queue.dequeuebatchsize") == 0)
{
batchSizeStr = es_str2cstr(pvals[i].val.d.estr, NULL);
dbgprintf("omazuremds: newact batch size = '%s'\n", batchSizeStr);
if (batchSizeStr != NULL) {
pData->batchSize = atoi(batchSizeStr);
free(batchSizeStr);
batchSizeStr = NULL;
}
}
else if (strcmp(actpblk.descr[i].name, "mdsdport") == 0)
{
mdsdPortStr = es_str2cstr(pvals[i].val.d.estr, NULL);
if (mdsdPortStr != NULL)
{
pData->mdsdPort = atoi(mdsdPortStr);
free(mdsdPortStr);
mdsdPortStr = NULL;
}
}
else {
dbgprintf("omazuremds: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
if (pData->batchSize <= 0)
{
dbgprintf("omazuremds: action requires queue.dequeuebatchsize");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
if(tplName == NULL) {
dbgprintf("omazuremds: action requires a template name");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
/* template string 0 is just a regular string */
OMSRsetEntry(*ppOMSR, 0, (uchar*)tplName, OMSR_NO_RQD_TPL_OPTS);
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
dbgprintf("omazuremds: ENDnewActInst\n");
ENDnewActInst
/* This will parse rsyslog conf file. It will check
* for legacy formats. Because legacy formats are not supported,
* it will report errors for legacy formats.
*/
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
if(!strncmp((char*) p, ":omazuremds:", sizeof(":omazuremds:") - 1)) {
errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
"omazuremds supports only v6 config format, use: "
"action(type=\"omazuremds\" ...)");
}
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
BEGINmodExit
CODESTARTmodExit
ENDmodExit
/* These defines module level parameters and configurations */
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES /* for new config syntax */
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* To support the transactional interface! */
ENDqueryEtryPt
/* Reset config variables for this module to default values.
*/
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
DEFiRet;
RETiRet;
}
BEGINmodInit()
CODESTARTmodInit
INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
if(!bCoreSupportsBatching) {
errmsg.LogError(0, NO_ERRCODE, "omazuremds: batching is not supported. rsyslog core too old.");
ABORT_FINALIZE(RS_RET_ERR);
}
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
etc.tar
Description: Unix tar archive
_______________________________________________ rsyslog mailing list http://lists.adiscon.net/mailman/listinfo/rsyslog http://www.rsyslog.com/professional-services/ What's up with rsyslog? Follow https://twitter.com/rgerhards NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE THAT.

