RT Streams

One peculiar feature in ChibiOS/RT are Streams, this is not functional subsystem but a generic interface declaration for readable and writable objects.

Description

Streams are an abstraction over objects that need to implement read and write functionalities. Even if ChibiOS is written in C its architecture is strongly object oriented, there are classes of objects like semaphores and queues and there are also abstract interfaces, streams are an example of such interface.

The ancestor of all streams-like interfaces is the Sequential Stream.

Sequential Stream

This is the interface for readable and blocking writeable objects. Methods of this interface only return when the specified amount of data has been read or written, the behaviour is always blocking.

API

chSequentialStreamWrite() Writes data to a stream.
chSequentialStreamRead() Reads data from a stream.
chSequentialStreamPut() Writes a single byte to stream.
chSequentialStreamGet() Reads a single byte from a stream.

Note that the above are not C functions but methods of the interface, the first parameter is conventionally a pointer to an object (a structure basically) implementing the Sequential Stream interface.

Examples

Extending Streams

One important feature is that we can extend interfaces by writing interfaces or classes inheriting them. In this example we will see how to write a new interface inheriting from Sequential Streams and adding non-blocking methods, the new interface is called BaseChannel and is meant as ancestor of I/O-oriented interfaces.

Base Channels

The code:

/*
 * BaseChannel specific methods.
 * Note, the macro _base_sequential_stream_methods inherits the methods of
 * the parent interface, then new methods are added.
 */
#define _base_channel_methods                                               \
  _base_sequential_stream_methods                                           \
  /* Channel put method with timeout specification.*/                       \
  msg_t (*putt)(void *instance, uint8_t b, systime_t time);                 \
  /* Channel get method with timeout specification.*/                       \
  msg_t (*gett)(void *instance, systime_t time);                            \
  /* Channel write method with timeout specification.*/                     \
  size_t (*writet)(void *instance, const uint8_t *bp,                       \
                   size_t n, systime_t time);                               \
  /* Channel read method with timeout specification.*/                      \
  size_t (*readt)(void *instance, uint8_t *bp, size_t n, systime_t time);
 
/**
 * BaseChannel specific data.
 * Note, it is empty because BaseChannel is only an interface without
 * implementation.
 */
#define _base_channel_data                                                  \
  _base_sequential_stream_data
 
/**
 * BaseChannel virtual methods table.
 */
struct BaseChannelVMT {
  _base_channel_methods
};
 
/**
 * Base channel interface.
 * This interface represents a generic, byte-wide, I/O channel
 */
typedef struct {
  /** @brief Virtual Methods Table.*/
  const struct BaseChannelVMT *vmt;
  _base_channel_data
} BaseChannel;
 
/**
 * Channel blocking byte write with timeout.
 */
#define chnPutTimeout(ip, b, time) ((ip)->vmt->putt(ip, b, time))
 
/**
 * Channel blocking byte read with timeout.
 */
#define chnGetTimeout(ip, time) ((ip)->vmt->gett(ip, time))
 
/**
 * Channel blocking write with timeout.
 */
#define chnWriteTimeout(ip, bp, n, time) ((ip)->vmt->writet(ip, bp, n, time))
 
/**
 * Channel blocking read with timeout.
 */
#define chnReadTimeout(ip, bp, n, time) ((ip)->vmt->readt(ip, bp, n, time))
/** @} */

The above code is a bit hard to understand but this is the price to pay in order to implement object-oriented programming concepts in a language not designed to support it.

Implementing Streams

The next logical step is to implement the sequential streams interface into an usable class. This example implements a Memory Stream, a stream able to read and write into a memory buffer.

Memory Streams

The code:

/*
 * memstream.h header.
 */
 
/*
 * MemStream specific data.
 */
#define _memory_stream_data                                                 \
  _base_sequential_stream_data                                              \
  /* Pointer to the stream buffer.*/                                        \
  uint8_t               *buffer;                                            \
  /* Size of the stream.*/                                                  \
  size_t                size;                                               \
  /* Current end of stream.*/                                               \
  size_t                eos;                                                \
  /* Current read offset.*/                                                 \
  size_t                offset;
 
/*
 * MemStream virtual methods table, nothing added.
 */
struct MemStreamVMT {
  _base_sequential_stream_methods
};
 
/**
 * Memory stream object.
 */
typedef struct {
  /** @brief Virtual Methods Table.*/
  const struct MemStreamVMT *vmt;
  _memory_stream_data
} MemoryStream;
 
void msObjectInit(MemoryStream *msp, uint8_t *buffer,
                  size_t size, size_t eos);
/*
 * memstream.c implementation.
 */
 
#include <string.h>
 
#include "ch.h"
#include "memstreams.h"
 
/* Methods implementations.*/
static size_t writes(void *ip, const uint8_t *bp, size_t n) {
  MemoryStream *msp = ip;
 
  if (msp->size - msp->eos < n)
    n = msp->size - msp->eos;
  memcpy(msp->buffer + msp->eos, bp, n);
  msp->eos += n;
  return n;
}
 
static size_t reads(void *ip, uint8_t *bp, size_t n) {
  MemoryStream *msp = ip;
 
  if (msp->eos - msp->offset < n)
    n = msp->eos - msp->offset;
  memcpy(bp, msp->buffer + msp->offset, n);
  msp->offset += n;
  return n;
}
 
static msg_t put(void *ip, uint8_t b) {
  MemoryStream *msp = ip;
 
  if (msp->size - msp->eos <= 0)
    return MSG_RESET;
  *(msp->buffer + msp->eos) = b;
  msp->eos += 1;
  return MSG_OK;
}
 
static msg_t get(void *ip) {
  uint8_t b;
  MemoryStream *msp = ip;
 
  if (msp->eos - msp->offset <= 0)
    return MSG_RESET;
  b = *(msp->buffer + msp->offset);
  msp->offset += 1;
  return b;
}
 
static const struct MemStreamVMT vmt = {writes, reads, put, get};
 
/*
 * Memory stream object initialization.
 *
 * msp       pointer to the MemoryStream object to be initialized
 * buffer    pointer to the memory buffer for the memory stream
 * size      total size of the memory stream buffer
 * eos       initial End Of Stream offset
 */
void msObjectInit(MemoryStream *msp, uint8_t *buffer,
                  size_t size, size_t eos) {
 
  msp->vmt    = &vmt;
  msp->buffer = buffer;
  msp->size   = size;
  msp->eos    = eos;
  msp->offset = 0;
}