Previous topic

4.8. Creating A Python Agent

Next topic

4.10. Conventions

This Page

4.9. Stream SupportΒΆ

There is a library that provides storage and streaming services for use by agents. Basic concept:

  1. Create a streamer object, usually SqliteDataStreamer()
  2. addStorage() to create tables that you will place data in
  3. storeData() to put data into the tables
  4. Define a formatter for taking the data and formatting it for a message
  5. stream() to start a new reader process that will periodically gather the latest data and send it
  6. doUpdates() to get all reader processes to actually do something
  7. stopStream() to stop a reader process

Example usage by the logging agent:

# creates the basic stream based on sqlite as the storage medium (most common)
# sqlitename can also be :memory: which uses an inmemory database
logstream = SqliteDataStreamer(sqlitename="logging.db", threadHack=True)

# Now add specify some data we will store in the table logkey
keys = ['name', 'thread', 'level', 'filename', 'fileno', 'msg', 'exception']
types = ['VARCHAR', 'VARCHAR', 'INTEGER', 'VARCHAR', 'INTEGER', 'VARCHAR', 'VARCHAR']
logstream.addStorage("logkey", keys, types)

# Here we add some data to the storage, the variable argcount includes
# a timestamp plus the values in the same order as keys/types
logstream.storeData("logkey", time.time(), record.name, record.threadName, record.levelno, record.filename, record.lineno, msg, tb)

# We also want to define a format function to take the table row data array
# and format it for sending.  In this case, our message contains the
# raw data rows but adds the keys as information for the receiver
def formatter(self, data, userdata):
    return yaml.safe_dump({ testbed.nodename: { 'index': ['time']+keys, 'data':data }})

# When we want to create a new stream of data, we use the stream call
# arguments include destination attributes (nodes, groups, docks),
# a period between packets, and the format function. Optional values include:
#    sqlfile for specifying additional sql for the select call
#    userdata for passing user data to the formatter and the stopStream return
logstream.stream("logkey", dstnodes, dstgroups, ['data', 'log'], period, formatter, sqlfilter=sql, userdata={'level':level})

# Every once and a while you MUST call doUpdates with a messaging interface
# This is when messages are sent.  It returns the nextstamp when it wants to
# send something.  This can be used to determine how often to call the function
# or it just be called 1/second if exepected periods are all reasonable.
nexttimestamp = logstream.doUpdates(now, messenger)

# Finally we stop the stream.
# The return value is the same optional object passed to stream
userdata = logstream.stopStream("logkey")