[rtg] buffering mechanism

Leech, Jonathan jleech at virtela.net
Tue May 23 19:30:38 EDT 2006


Matt,

I was initially thinking IPC so as to have a seperate process writing to the database... and that doesn't fit well with the RTG architecture.  

But I thought about it some more and I think it might be useful anyway as a Message Queue might survive all kinds of bad things like the RTG process being stopped, hanging, box losing power, etc.

As for buffering I'm thinking along the lines of the following:
- every T time period
 - invoke up to N database writer threads 
  - each writing up to R rows from the queue to the database and commiting.

-Jonathan


-----Original Message-----
From: Matt Provost [mailto:mprovost at termcap.net]
Sent: Tuesday, May 23, 2006 5:03 PM
To: Leech, Jonathan
Cc: rtg at fireflynetworks.net
Subject: Re: [rtg] buffering mechanism


On Tue, May 23, 2006 at 01:59:18PM -0600, Leech, Jonathan wrote:
> Hello,
> 
> We are considering using RTG at my company, and if so I will be making some enhancements.  
> 
> I see on the TODO list, #8. Add in buffering mechanism to buffer poll results in case the SQL database is down or unreachable, i.e. separate SQL insert thread pool.
> 
> Does anyone have a particular architecture in mind, and if not, any objections to using a Message Queue?  I found a nice intro article at http://www.ecst.csuchico.edu/~beej/guide/ipc/mq.html.  I have used Named Pipes before, in a past life.
> 
> Also I would be writing support for an Oracle database.
> 

It would be nice to have an Oracle driver going, I've heard several
people ask for it but I don't have access to Oracle to write it.

About a year ago I wrote up the buffer thing but it never got committed
since we've been trying to get the latest version out and didn't want to
add new stuff. Since it's been sitting around forever, maybe we should
just commit it sooner rather than later. I don't think it actually did
the buffering but it did implement a queue to store inserts and then
split off the db connection into a single thread, so you have multiple
polling and inserting threads. It should be pretty easy to add buffering
to this code, surely a lot easier than writing something from scratch.

The message queue and named piped stuff is all IPC which doesn't apply
here since we're running as a single multithreaded process. So it's a
lot more straightforward since you don't have to pass this stuff around
from process to process.


Here's the original message and two files, rtg.h and rtginsert.c that
implement this. Want to see if you can get this patched into the current
cvs version? It should be pretty straightforward.

Thanks,
Matt




Ok so I've been working on having a single db insert thread or a
per-thread connection and I got a little carried away...

Anyway I coded up a new thread function called inserter that connects to
the db and you guessed it handles the inserts. It communicates with the
snmp threads through a linked list queue that stores the values to be
inserted. So now, there are two thread pools, one doing snmp polls and
inserting the results into the queue and other db threads that pull data
from the queue and insert it. They both run asynchronously of each other
which is pretty cool to see in action. With this new code, rtgpoll is
really looking good for the 0.8 release.

The interesting part was tweaking the queue stuff to work right. Moving
the db stuff was really easy since it's only a few calls at this point
anyway. I've tried running with a single insert thread, equal numbers of
threads and more insert threads than poller threads and they all work
fine. I've run it overnight with both mysql and pgsql drivers with no
memory leaks and no instability, but granted I have a pretty limited
setup at home to test. Anyway I think I got the critical bugs.

Lots of little stuff needs to be updated, like now we use NOW() in SQL
to insert the timestamp, but if something could sit in the queue for a
while then we should get the timestamp at poll time and save it.

I imagine one problem could be that if the db is slow it will get behind
and the queue will keep growing faster than it is emptied, but we can
put in a check for a certain queue size to throw errors or do something
more intelligent. It doesn't do it yet but this could buffer inserts in
case the db drops for a few minutes which is something that we've talked
about before. Hmm actually that's a pretty trivial change, I'll have to
work on that tonight...

In the future we could even do some cooler stuff and have a queue per
table, so the thread could grab several inserts at once and combine them
into a single db operation, which is something like what Bill has I
believe.

Anyway there's lots of stuff that I need to clean up, and a lot of
variables that I'd want to rename in the original code etc but I just
have a working copy for now. I'll attach the interesting files so you
guys can check them out. I'm looking for some feedback as to whether
this is the right direction to take. I think this is in line with what
we have discussed on the list before. If you guys think this is cool,
I'll clean it up and start committing a minimal diff that gets it in
there, and then start cleaning up variable names etc and getting it
ready for release. Also need to update the config file to have
SNMP_Threads and DB_Threads as separate variables or something like
that.

Thanks for taking a look!
Matt

rtg.h:
/****************************************************************************
   Program:     $Id: rtg.h,v 1.41 2005/03/10 06:40:33 mprovost Exp $ 
   Author:      $Author: mprovost $
   Date:        $Date: 2005/03/10 06:40:33 $
   Description: RTG headers
****************************************************************************/

#include <common.h>
#include <sys/param.h>

#ifndef _RTG_H_
#define _RTG_H_ 1

/****** XXX 0.8 FEATURES -- Remove, resolve or fix for release */
#define FEATURES 1
#define RATE_INSERT 0
/****** XXX 0.8 FEATURES -- Remove, resolve or fix for release */

/* Defines */ 
#ifndef FALSE
# define FALSE 0
#endif
#ifndef TRUE
# define TRUE !FALSE
#endif

/* Define Linux pthread brokeness */
/* #define BROKEN_LINUX 1 */

/* Constants */
#define MAX_THREADS 100
#define BUFSIZE 512
#define BITSINBYTE 8
#define THIRTYTWO 4294967295ul
#ifdef HAVE_STRTOLL
# define SIXTYFOUR 18446744073709551615ull
#else
# define SIXTYFOUR 18446744073709551615ul
#endif
#define KILO 1000
#define MEGA (unsigned int)(KILO * KILO)
#define GIGA (unsigned long long)(MEGA * KILO)
#define TERA (unsigned long long)(GIGA * KILO)


/* Define CONFIG_PATHS places to search for the rtg.conf file.  Note
   that RTG_HOME, as determined during autoconf is one path */
#define CONFIG_PATHS 3
#define CONFIG_PATH_1 ""
#define CONFIG_PATH_2 "/etc/"

/* Defaults */
#define DEFAULT_CONF_FILE "rtg.conf"
#define DEFAULT_THREADS 5
#define DEFAULT_INTERVAL 300
#define DEFAULT_HIGHSKEWSLOP 3
#define DEFAULT_LOWSKEWSLOP .5
#define DEFAULT_OUT_OF_RANGE 93750000000ull
#define DEFAULT_DB_DRIVER "librtgmysql.so"
#define DEFAULT_DB_HOST "localhost"
#define DEFAULT_DB_DB "rtg"
#define DEFAULT_DB_USER "snmp"
#define DEFAULT_DB_PASS "rtgdefault"
#define DEFAULT_SNMP_VER 1
#define DEFAULT_SNMP_PORT 161
#define DEFAULT_SYSLOG_FACILITY LOG_LOCAL2

/* PID File */
#define PIDFILE "/var/run/rtgpoll.pid"

#define STAT_DESCRIP_ERROR 99
#define HASHSIZE 5000

/* pthread error messages */
#define PML_ERR "pthread_mutex_lock error\n"
#define PMU_ERR "pthread_mutex_unlock error\n"
#define PCW_ERR "pthread_cond_wait error\n"
#define PCB_ERR "pthread_cond_broadcast error\n"

/* pthread macros */
#define PT_MUTEX_LOCK(x) if (pthread_mutex_lock(x) != 0) printf(PML_ERR)
#define PT_MUTEX_UNLOCK(x) if (pthread_mutex_unlock(x) != 0) printf(PMU_ERR)
#define PT_COND_WAIT(x,y) if (pthread_cond_wait(x, y) != 0) printf(PCW_ERR)
#define PT_COND_BROAD(x) if (pthread_cond_broadcast(x) != 0) printf(PCB_ERR)

/* Verbosity levels LOW=info HIGH=info+SQL DEBUG=info+SQL+junk */
enum debugLevel {OFF, LOW, HIGH, DEBUG, DEVELOP}; 

/* These ugly macros keep the code clean and everything inline for speed */
#define sysloginfo(x...) syslog(LOG_INFO |  LOG_LOCAL2, x) // replace with conf'g facility
#define syslogcrit(x...) syslog(LOG_CRIT |  LOG_LOCAL2, x) // replace with conf'g facility
#define debug(level,x...) do {if (set->verbose >= level) {if (set->daemon) sysloginfo(x); else fprintf(stdout,x);} } while (0)
#define debugfile(dfp,level,x...) do {if (set->verbose >= level) {if (set->daemon) sysloginfo(x); else fprintf(dfp,x);} } while (0)
#define fatal(x...) do { if (set->daemon) syslogcrit(x); else fprintf(stderr,x); exit(-1); } while (0)
#define fatalfile(dfp,x...) do { fprintf(dfp,x); exit(-1); } while (0)

/* Target state */
enum targetState {NEW, LIVE, STALE};

/* Typedefs */
typedef struct worker_struct {
    int index;
    pthread_t thread;
    struct crew_struct *crew;
} worker_t;

typedef struct dbworker_struct {
    int index;
    pthread_t thread;
} dbworker_t;

typedef struct config_struct {
    unsigned int interval;
    char dbdriver[MAXPATHLEN];
    char dbhost[80];
    char dbdb[80];
    char dbuser[80];
    char dbpass[80];
    enum debugLevel verbose;
    unsigned short withzeros;
    unsigned short dboff;
    unsigned short multiple;
    unsigned short snmp_port;
    unsigned short threads;
    unsigned short daemon;
    unsigned short syslog;
    float highskewslop;
    float lowskewslop;
} config_t;

typedef struct target_struct {
    struct host_struct *host;
    char *objoid;
    char *description;
    char *table;
    unsigned short bits;
    unsigned int iid;
#ifdef HAVE_STRTOLL
    unsigned long long maxspeed;
#else
    unsigned long maxspeed;
#endif
    enum targetState init;
    unsigned long long last_value;
#ifdef FEATURES
    struct timeval last_time;
#endif
    struct target_struct *next;
} target_t;

typedef struct host_struct {
    char *host;
    char *community;
    unsigned short snmp_ver;
} host_t;

typedef struct crew_struct {
    int work_count;
    worker_t member[MAX_THREADS];
    pthread_mutex_t mutex;
    pthread_cond_t done;
    pthread_cond_t go;
} crew_t;

typedef struct dbcrew_struct {
    dbworker_t member[MAX_THREADS];
} dbcrew_t;

typedef struct poll_stats {
    pthread_mutex_t mutex;
    unsigned long long polls;
    unsigned long long db_inserts;
    unsigned int round;
    unsigned int wraps;
    unsigned int no_resp;
    unsigned int out_of_range;
    unsigned int errors;
    unsigned int slow;
#ifdef FEATURES
    unsigned int zero;
    unsigned int flat;
#endif
    double poll_time; 
} stats_t;

typedef struct hash_struct {
    int bucket;
    target_t *target;
    target_t **table;
} hash_t;

typedef struct insert_struct {
   char *table;
   int iid;
   unsigned long long counter;
   double rate;
} insert_t;

typedef struct insert_queue_node {
   insert_t insert;
   struct insert_queue_node *next;
} insert_q_node;

/* linked list queue for inserts */
typedef struct insert_queue {
   pthread_mutex_t lock;
   pthread_cond_t todo;
   insert_q_node *front;
   insert_q_node *rear;
   unsigned long count;
} insert_q;

int startq(insert_q *);
int pushq(insert_q *, insert_t);
insert_t * popq(insert_q *); 

/* Precasts: rtgpoll.c */
void *sig_handler(void *);
void usage(char *);

/* Precasts: rtgpoll.c */
void *poller(void *);

/* Precasts: rtginsert.c */
void *inserter(void *);

#if HAVE_MYSQL
/* Precasts: rtgmysql.c */
int mysql_db_insert(char *, MYSQL *);
int mysql_dbconnect(char *, MYSQL *);
void mysql_dbdisconnect(MYSQL *);
void mysql_close_db(void *);
#endif

#if HAVE_PGSQL
/* Precasts: rtgpgsql.c */
PGresult * pgsql_db_query(char *, PGconn *);
int pgsql_db_insert(char *, PGconn *);
PGresult * pgsql_db_select(char *, PGconn *);
int pgsql_db_begin(PGconn *);
int pgsql_db_commit(PGconn *);
PGconn * pgsql_dbconnect(char *, PGconn *);
void pgsql_dbdisconnect(PGconn *);
void pgsql_close_db(void *);
#endif

/* Precasts: rtgutil.c */
int read_rtg_config(char *, config_t *);
int write_rtg_config(char *, config_t *);
void config_defaults(config_t *);
void print_stats (stats_t, config_t *);
int sleepy(float, config_t *);
void timestamp(char *);
double timediff(struct timeval, struct timeval);
int checkPID(char *, config_t *);
int daemon_init();

/* Precasts: rtghash.c */
void init_hash();
void init_hash_walk();
target_t *getNext();
void free_hash();
unsigned long make_key(const void *);
void mark_targets(int);
unsigned int delete_targets(int);
void walk_target_hash();
void *in_hash(target_t *, target_t *);
int compare_targets(target_t *, target_t *);
int del_hash_entry(target_t *);
int add_hash_entry(target_t *);
int hash_target_file(char *);

/* extern config_t set; */
extern int lock;
extern int waiting;
extern char config_paths[CONFIG_PATHS][BUFSIZE];
extern FILE *dfp;

#endif /* not _RTG_H_ */

rtginsert.c:
#include "common.h"
#include "rtg.h"
#include "rtgdbi.h"

extern config_t *set;

/* initialize the insert queue */
int startq(insert_q *iq) {
        pthread_mutexattr_t mta;

        /* the mutex needs to be recursive, so build an attribute */
        pthread_mutexattr_init(&mta);
        pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_RECURSIVE);

        /* init the recursive mutex */
        pthread_mutex_init(&iq->lock, &mta);

        /* destroy the attribute now that it's done */
        pthread_mutexattr_destroy(&mta);

        /* init the cond variable */
        pthread_cond_init(&iq->todo, NULL);

        /* create the first entry */
        iq->front = (insert_q_node *)malloc(sizeof(insert_q_node));
        if (iq->front == NULL)
                return FALSE;
        iq->rear = iq->front;
        iq->front->next = NULL;

        iq->count = 0;

        return TRUE;
}

/* put an insert into the queue */
int pushq(insert_q *iq, insert_t insert) {
        insert_q_node *new;

        new = (insert_q_node *)malloc(sizeof(insert_q_node));
        if (new == NULL)
                return FALSE;

        new->insert = insert;

        /* this is going on the end of the queue, so nothing after it */
        new->next = NULL;

        /* lock the queue */
        /* TODO add to cancel stack */
        PT_MUTEX_LOCK(&iq->lock);

                iq->rear->next = new;
                iq->rear = new;
                iq->count++;

        /* unlock */
        PT_MUTEX_UNLOCK(&iq->lock);

        /* signal everyone that there is a new insert waiting */
        pthread_cond_broadcast(&iq->todo);

        return TRUE;
}

/* get an insert from the queue */
insert_t * popq(insert_q *iq) {
        insert_q_node *first;
        insert_t *output;

        /* lock the queue */
        /* TODO add to cancel stack */
        PT_MUTEX_LOCK(&iq->lock);

                if (iq->count > 0) {
                        /* get the first node, skip the blank starter */
                        first = iq->front->next;
                        /* set the pointer to return */
                        output = &first->insert;

                        /* last insert? */
                        /* could also check count==1 */
                        /* if (iq->front->next->next == NULL) { */
                        if (first->next == NULL) {
                                /* reset to empty queue */
                                iq->rear = iq->front;
                        } else {
                                iq->front->next = iq->front->next->next;
                                /* first = iq->front->next->next; */
                                /* first = first->next; */
                        }

                        iq->count--;

                } else {
                        output = NULL;
                }

        /* unlock */
        PT_MUTEX_UNLOCK(&iq->lock);

        return output;
}

void *inserter(void *thread_args)
{
	insert_q *iq = (insert_q *) thread_args;
	insert_t *insert;

	int db_status;

	debug(HIGH, "In inserter!\n");

	if (!(db_init(set))) {
		fatal("** Database error - check configuration.\n");
	}
	/* connect to the database */
	if (!(db_connect(set))) {
		fatal("server not responding.\n");
	}

	while(1)
	{
		/* first lock the mutex */
		PT_MUTEX_LOCK(&iq->lock);

		/* see if there is work to do */
		if (iq->count == 0) {
			/* wait on the condition variable for something interesting */
			pthread_cond_wait(&iq->todo, &iq->lock);
		}

		debug(HIGH, "Done waiting!\n");
		debug(HIGH, "Queue = %lu\n", iq->count);

		/* remove a value from the queue */
		insert = popq(iq);

		/* unlock the queue so other threads can work, then insert */
		PT_MUTEX_UNLOCK(&iq->lock);

		if (insert != NULL) {
			debug(HIGH, "table=%s\n", insert->table);
			db_status = db_insert(insert->table, insert->iid, insert->counter, insert->rate);
		}

		free(insert);

	}
}




More information about the RTG mailing list