There is a library that provides storage and streaming services for use by agents. Basic concept:
Example usage by the logging agent:
# creates the basic stream based on sqlite as the storage medium (most common)
# sqlitename can also be :memory: which uses an inmemory database
logstream = SqliteDataStreamer(sqlitename="logging.db", threadHack=True)
# Now add specify some data we will store in the table logkey
keys = ['name', 'thread', 'level', 'filename', 'fileno', 'msg', 'exception']
types = ['VARCHAR', 'VARCHAR', 'INTEGER', 'VARCHAR', 'INTEGER', 'VARCHAR', 'VARCHAR']
logstream.addStorage("logkey", keys, types)
# Here we add some data to the storage, the variable argcount includes
# a timestamp plus the values in the same order as keys/types
logstream.storeData("logkey", time.time(), record.name, record.threadName, record.levelno, record.filename, record.lineno, msg, tb)
# We also want to define a format function to take the table row data array
# and format it for sending. In this case, our message contains the
# raw data rows but adds the keys as information for the receiver
def formatter(self, data, userdata):
return yaml.safe_dump({ testbed.nodename: { 'index': ['time']+keys, 'data':data }})
# When we want to create a new stream of data, we use the stream call
# arguments include destination attributes (nodes, groups, docks),
# a period between packets, and the format function. Optional values include:
# sqlfile for specifying additional sql for the select call
# userdata for passing user data to the formatter and the stopStream return
logstream.stream("logkey", dstnodes, dstgroups, ['data', 'log'], period, formatter, sqlfilter=sql, userdata={'level':level})
# Every once and a while you MUST call doUpdates with a messaging interface
# This is when messages are sent. It returns the nextstamp when it wants to
# send something. This can be used to determine how often to call the function
# or it just be called 1/second if exepected periods are all reasonable.
nexttimestamp = logstream.doUpdates(now, messenger)
# Finally we stop the stream.
# The return value is the same optional object passed to stream
userdata = logstream.stopStream("logkey")