baldaquin.buf
— Event buffering#
The module provides all the necessary facilities to buffer the binary data coming from the hardware and make sure they get written to disk in the intended way.
Buffers#
The class hierarchy within the module rotates around the
AbstractBuffer
abstract class, representing
a collection of packets that supports insertion and removal in constant time.
This abstract class requires the following interfaces to be defined:
_do_put()
to insert a single packet into the buffer (the actual hook to instert packets is actually calledput()
, but this is implemented in the abstract class so that we can consistently enforce that the packets being inserted into the buffer are of the proper type, and subclasses implement the specific semantic of the method in_do_put()
);pop()
to retrieve (and remove) a single packet from the buffer;size()
returning the number of events in the buffer at any given time;clear()
to clear the buffer;
The module then provides two concrete sub-classes, acting as FIFOs, than can be used in practice:
Both concrete classes are thread-safe, as a buffer is in general accessed by multiple threads. The basic usage looks like
>>> buffer = FIFO()
>>> print(buffer.size())
>>> 0
>>> buffer.put(packet)
>>> print(buffer.size())
>>> 1
>>> packet = buffer.pop()
>>> print(buffer.size())
>>> 0
In addition, the base class provides a flush()
method that will write the entire buffer content to file (more of the internals
of this mechanism in the next section).
The concrete buffer classes make no attempt at synchronizing the I/O, but they
do provide useful hooks for external code to figure out whether a
flush()
operation is needed.
More specifically:
almost_full()
returnsTrue
when the number of events in the buffer exceeds theflush_size
value passed to the constructor;time_since_last_flush()
returns the time (in seconds) elapsed since the lastflush()
call;flush_needed()
returnsTrue
when either the buffer is almost full or the time since the last flush exceeds theflush_interval
value passed to the constructor.
By using the proper combination of flush_size
and flush_interval
it is
possible to achieve different effects, e.g., if flush_size
is None
, then
the I/O will effectively happen at regular time intervals, according to the
flush_interval
value.
Sinks#
The operation of writing the buffer to file(s) is handled through the
Sink
concept. A sink is a descriptor than can be
attached to a buffer, and encapsulates all the information that is necessary
to write the packets out in a suitable format.
In order to be able to be flushed, a buffer must have at least a primary sink attached to it, which by defaults writes the packets verbatim to the output file in binary form.
>>> buffer = CircularBuffer()
>>> buffer.set_primary_sink(file_path)
>>> ...
>>> buffer.flush() # <- This will write to disk the packets in the buffer.
In addition, custom sinks can be added to a buffer, specifying the path to the output file, the write mode, the packet formatter and an optional header. By using the proper combination of sinks, one can achieve useful things, such as writing the data out in binary format and, at the same time, write a text file where the packets are formatted with a suitable ASCII representation, i.e.,
>>> buffer = CircularBuffer()
>>> buffer.set_primary_sink(binary_file_path)
>>>
>>> # Define a custom sink---the packet class should have a ``as_text()`` hook
>>> # returning a suitable text representation of the thing
>>> buffer.add_custom_sink(text_file_path, WriteMode.TEXT, Packet.as_text, header)
>>>
>>> # ... fill the buffer with some packets.
>>>
>>> buffer.flush() # <- This will write two output files!
Module documentation#
Data buffering.
- class baldaquin.buf.WriteMode(value)[source]#
Small enum class for the file write mode.
Note this has to match the open modes in the Python open() builtin.
- BINARY = 'b'#
- TEXT = 't'#
- class baldaquin.buf.Sink(file_path: Path, mode: WriteMode, formatter: Callable | None = None, header: Any | None = None)[source]#
Small class describing a file sink where a buffer can be flushed.
- Parameters:
file_path (Path or str) – The path to the output file.
mode (WriteMode) – The write mode (
WriteMode.BINARY
orWriteMode.TEXT
)formatter (callable, optional) – The packet formatting function to be used to flush a buffer.
header (anything that can be written to the output file) – The file optional file header. If not None, this gets written to the output file when the sink is created.
- class baldaquin.buf.AbstractBuffer(max_size: int, flush_size: int, flush_interval: float)[source]#
Abstract base class for a data buffer.
- Parameters:
max_size (int) – The maximum number of packets the buffer can physically contain.
flush_size (int) – The maximum number of packets before a
flush_needed()
call returns True (mind this should be smaller thanmax_size
because otherwise the buffer will generally drop packets).flush_interval (float) – The maximum time (in s) elapsed since the last
flush()
before aflush_needed()
call returns True.
- put(packet: AbstractPacket) None [source]#
Put a packet into the buffer.
Note we check in the abstract class that the packet we put into the buffer is an
AbstractPacket
instance, while the actual work is done in the_do_put()
method, that is abstract and should be reimplemented in all derived classes.
- abstract _do_put(packet: AbstractPacket) None [source]#
Abstract method with the actual code to put a packet into the buffer (to be reimplemented in derived classes).
Warning
In case you wonder why this is called
_do_put
and not, e.g.,_put()
… well: whoever designed thequeue.Queue
class (which we rely on for of the actual buffer implementations) apparently had the same brilliant idea of delegating theput()
call to a_put()
function, and overloading that name was putting things into an infinite loop.
- abstract pop() Any [source]#
Pop an item from the buffer (to be reimplemented in derived classes).
Note
The specific semantic of which item is returned (e.g, the first, last, or something more clever) is delegated to the concrete classes, but we will be mostly dealing with FIFOs, i.e., unless otherwise stated it should be understood that this is popping items from the left of the queue.
- abstract size() int [source]#
Return the number of items in the buffer (to be reimplemented in derived classes).
- time_since_last_flush()[source]#
Return the time (in s) since the last flush operation, or since the buffer creation, in case it has never been flushed.
- add_custom_sink(file_path: Path, mode: WriteMode, formatter: Callable | None = None, header: Any | None = None) Sink [source]#
Add a sink to the buffer.
See the
Sink
class constructor for an explanation of the arguments.
- _pop_and_write_raw(num_packets: int, output_file: IOBase) int [source]#
Pop the first
num_packets
packets from the buffer and write the corresponding raw binary data into the given output file.- Parameters:
num_packets (int) – The number of packets to be written out.
output_file (io.IOBase) – The output binary file.
- Returns:
The number of bytes written to disk.
- Return type:
int
- _write(num_packets: int, output_file: IOBase, formatter: Callable) int [source]#
Write the first
num_packets
packets from the buffer to the given output file (and with the given formatter) without popping them.- Parameters:
num_packets (int) – The number of packets to be written out.
output_file (io.IOBase) – The output binary file.
formatter (Callable) – The packet formatting function.
- Returns:
The number of bytes written to disk.
- Return type:
int
- flush() tuple[int, int] [source]#
Write the content of the buffer to all the sinks connected.
Note
This will write all the items in the buffer at the time of the function call, i.e., items added while writing to disk will need to wait for the next call.
- _abc_impl = <_abc._abc_data object>#
- class baldaquin.buf.FIFO(max_size: int | None = None, flush_size: int | None = None, flush_interval: float = 1.0)[source]#
Implementation of a FIFO.
This is using the queue module in the Python standard library.
Note that the queue.Queue class is internally using a collections.deque object, so this is effectively another layer of complexity over the CircularBuffer class below. It’s not entirely clear to me what the real difference would be, in a multi-threaded context.
- _do_put(packet: AbstractPacket, block: bool = True, timeout: float | None = None) None [source]#
Overloaded method.
See https://docs.python.org/3/library/queue.html as for the meaning of the function arguments.
- _abc_impl = <_abc._abc_data object>#
- class baldaquin.buf.CircularBuffer(max_size: int | None = None, flush_interval: float = 1.0, flush_size: int | None = None)[source]#
Implementation of a simple circular buffer.
This is a simple subclass of the Python collections.deque data structure, adding I/O facilities on top of the base class.
Verbatim from the Python documentation: deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction. For completeness, the idea of using a deque to implement a circular buffer comes from https://stackoverflow.com/questions/4151320
- _do_put(packet: AbstractPacket) None [source]#
Overloaded method.
- _abc_impl = <_abc._abc_data object>#