Let's Talk
0.2
Simple DDS communication
|
Let's Talk is a C++ communication library compatible with DDS (the Data Distribution Service) designed for simple and efficient interprocess coordination on local networks. The library is simple API for publish/subscribe, request/reply, and reactor communication patterns, along with some concurrency tools. Let's Talk also ships a self-contained distribution of FastDDS designed to be built inline with you project. CMake support for DDS is provided through macros designed to make handling IDL files as painless as possible. It's guiding principle is that simple things should be easy. It tries to adopt sensible defaults while providing access to more functionality through optional arguments.
Here's the basic "hello world" example from Fast DDS using the Let's Talk API. The subscriber application:
and the publisher application:
Subscription involves little more than providing a callback function (typically a lambda) and a topic name. Publication involves creating a Publisher
object, then calling publish()
to send messages. The publish/subscribe coding is simple and thread-safe; no byzantine class hierarchies, obscure quality of service settings, nothing.
The design of the main Participant API is based on the ignition::transport API (but Let's Talk is not ignition::transport compatible). Publish/subscribe is compatible with other DDS vendors (RTI Connext, Cyclone, etc.) to the extent that FastDDS is compatible. The request/reply and reactor pattern are not compatible outside of Let's Talk.
The easiest way to use Let's Talk is as a git submodule. You can add it as a submodule via
This will create the directory LetsTalk. In your CMakeLists.txt, add
This will provide the following cmake targets:
letstalk
– The library (with appropriate includes)fastdds
– The underlying FastDDS libraryTo include and link myTarget
to Let's Talk, you just need to add the cmake
(The letstalk
target depends on fastdds
, so you don't need to reference fastdds
directly.)
If you have several projects that depend on Let's Talk, it is more efficient to install the library per usual. In this case, check out the code and do
This will provide a cmake config script at <your install dir>/lib/cmake/letstalk
that you can use in your cmake like
This will provide the same cmake targets (letstalk
and fastdds
) for linking as above.
Working with IDL in Let's Talk is especially easy. Inspired by the protobuf cmake support, Let's Talk provides an "IdlTarget.cmake" macro that is included when the Let's Talk module is imported. Basic operation is
That is, IdlTarget creates a cmake target consisting of a library built from the provided compiled IDLs, linking transitively to all required libraries, and providing access to the header include path as a target property. The header and source cpp files are written in the build directory. If the IDL is changed, make/ninja will correctly re-run the IDL compiler, recompile generated source, and re-link. The intention is to have machine-generated code segregated from the rest of the code base. The full form is
where
target_name
will be the name of the new target created and what you will need to link toSHARED
forces the created library to be a shared libraryJSON
generates to/from json serialization methods access throught the header MyIdlJsonSupport.hpp
PATH
specifies the relative path where the generated code will be placed. This can be used to change the include path. Setting PATH foo/bar
will change #include "MyIdl.hpp"
to #include "foo/bar/MyIdl.hpp"
INCLUDE
specifies additional include paths for IDL compilationSOURCE
gives the list of IDL files that comprise the resultant library. These will be used to generate code, the code compiled, and then linked into the library.Let's talk offers three communication patters:
See below for more details on each pattern. The examples
directory provides sample code for each pattern.
In pub/sub, a group of publishers send data to all subscribers that match on a topic. Topics are strings (e.g. "robot.motion.command") combined with a type. Publishers are only matched to subscribers if both the topic name and topic types match. To subscribe, you register a callback with a participant,
or, if you wish to get samples as unique_ptr
s (because you plan to move them to another thread),
IMPORTANT: This callback is run on an internal thread, so long calculations or waiting for a lock will negatively impact the whole system. If you do need to do significant processing, Let's Talk provides a ThreadSafeQueue
class, and a convenience subscription mode,
Rather than running a callback, you get a pointer to the queue of messages. The queue then supplies pop()
to get a sample (with an optional wait time) and popAll()
to get all pending samples,
where the Queue
type is a std::deque<std::unique_ptr<T>>
by default. This puts you in charge of when to cause your thread to wait for data.
If you want to service multiple queues from one thread, you can use the Waitset
class. First, register all the queues with the waitset in the constructor:
Then, you may wait
for data to arrive in any of the queue. This blocks the calling thread and returns the index of the first queue with pending messages:
See example/waitset
for a detailed design. Note that the returned index represents the first queue that has data. Higher-numbered queues may also have data. The best practice is to use a fall-through design to check all of the higher indexed queues as well.
To cancel a subscription, the Participant provides an unsubscribe function,
You can also query how many publishers have been discovered for the topic,
For publishing, Participant acts as a factory for creating lightweight Publisher
objects,
To use it,
or via unique_ptr,
Publisher erases the MyType
information (i.e. all Publishers are formally the same type), but publishing a type other than MyType
will result in an error. Calling pub.topicType()
will return the type name as a string. You can check for subscribers using
To stop advertising data, simply dispose of the Publisher object.
Let's Talk also supports different "Quality of Service" (QoS) settings for publishers and subscribers. An optional string argument to advertise()
and subscribe()
gives the name of the QoS profile to use. For example,
will set the QoS to the bulk mode. See below for a full description of QoS settings in Let's Talk.
In request/reply, the "replier" provides a service that the "requester" accesses. A service is defined by a string name, a request type, and a reply type.
The server side, Let's Talk provides two forms: a "push" form that uses a callback and a "pull" form giving you more control.
The push form calls a callback on a special worker thread to perform the service work. Callbacks take the form
and the registration call on a participant pointer node
looks like
Note that you may throw exceptions in the callback to signal failure. The failure of the request (though not the specific error) will be forwarded to the requester. This form is intended to be very simple to use, but you surrender control over the threading.
The pull form allows you full control over the threading of the service, but at the cost of additional complexity. This form creates a Replier
object
You can obtain pending sessions from this object
specifying a wait time (10 ms here). The session object has a very simple API:
You can inspect the request data, provide a reply (closing the session), signal failure (also closing the session), or check if the session is live. If no request arrives in the wait time of getPendingSession()
, the returned Session
object will not be alive. You can also attach replier objects to a Waitset
to wait on multiple services in one thread.
The client side also has two options. Simple requests may be made directly on the participant via
Making a request returns as std::future
of the reply type. You may block waiting on that future immediately, or wait as much as you can afford and come back to the future later.
This form is somewhat less efficient on the first call, as all of the discovery occurs while you wait, but the back-end is stored in the participant so that subsequent requests will be faster. You may call unsubscribe()
on the participant to delete the standing request subscriptions if you are finished with a service.
Alternatively, you can first create a Requester object
The requester can be used to make multiple requests, check for connectivity, and check for "impostor" services (more below). Creating it performs all of the discovery tasks that can be time-consuming. The requester API is straightforward:
Two warnings about request/reply:
std::future
will throw a std::runtime_error
when get()
is called. You should use try/catch
if the service you are calling may signal requests as failed.impostorsExist()
to check for this state of affairs.The Reactor pattern is similar to the request/reply pattern and uses a pull-style API with session objects, but with more features. To provide a reactor service, we first create the server object,
Here, ProgressType
is optional; if your service doesn't provide progress data you can omit the argument. ReactorServer
objects are lightweight and may be copied cheaply. To see if clients have connected,
and to check for pending sessions,
If this is true, a request has been received. To service it, we obtain a Session
instance,
This takes an optional wait time if you want to have a blocking wait for sessions. You may also attach a ReactorServer
instance to a Waitset
just as with queues and Repliers. Like the server object, the session is lightweight. The session provides accessors for the request data
As you process the request, you can send back progress reports via
If you didn't specify a ProgressData
type, you may still send progress marks with
The progress mark integer uses values from 1 to 100, with 1 being a special value for "started" and 100 signaling completion. The ReactorServer
will automatically send these when you start and finish a session. Note you may send duplicate progress marks, or even have progress decreasing. To signal failure, motionSession.fail()
will dispose of the session, notifying the client. To finish a session, send the reply
Additionally, the client may cancel a session at any time. You can check if the session has been canceled by calling isAlive()
.
The client end is likewise similar to the request client. First, we create a client object on the participant:
We can then check for connections with motionClient.discoveredServer()
. Sending a request starts a session
The session API provides calls to determine if the session is alive (started by the server), get the current progress, get a progress data sample, or await the final reply. There's also a cancel()
call to end the session early.
The examples
directory contains demonstration programs for these three patterns, as well as sample cmake files. To build the examples, first build and install Let's Talk, then create a symlink to the install directory in the examples directory:
Let's Talk inspects several environment variables so that programs can easily modify the behavior at runtime.
LT_VERBOSE
– enables debug print messages about discovery and message passingLT_LOCAL_ONLY
– prevents discovery from finding participants on another hostLT_PROFILE
– Path to custom QoS profile XML fileTo use this on your program foo
, you can launch foo from the shell like this:
QoS determines the reliability of message passing. Let's Talk defines three levels of service that are always available:
To use a different QoS from "reliable," pass the QoS string name to the subscribe
or advertise
method.
In addition, Participants may have QoS profiles. These are used to alter the underlying protocol from UDP to TCP or something else. Currently only UDP profiles are available in the built-in QoS.
If you wish to develop your own QoS profiles, see https://fast-dds.docs.eprosima.com/en/latest/fastdds/xml_configuration/xml_configuration.html
When invoking your program, set the environment variable LT_PROFILE
to the path to your xml. The profile_name
attribute may be used as the optional argument for subscribe
and advertise
to use these QoS settings.
Let's Talk is based on the DDS is a publish/subscribe messaging system. Here's a small primer on DDS concepts.
Each node in the DDS network is a "participant." Participants discover each other, trading information on available topics and types. Participants also function as factories for the other objects – topic objects, types, publishers, and subscribers.
A topic is a channel for data. It's the combination of a string topic name and a data type. The types generally must be derived from IDL.
Types in DDS are typically derived from IDL source. IDL provides a C-like language for describing structured data. The IDL compiler produces C++ source code from these files that includes serialization and deserialization methods to/from the Common Data Format (CDR). DDS automatically performs the required serialization and deserialization as required.
An overloaded term, QoS refers to all of the run-time settings available in DDS. It includes the network protocol (TCP, UDP, shared memory), the error handling strategy, the depth of message history that is stored, and many other details.
The Data Distribution Service is an efficient and powerful publish/subscribe framework, but it is very complicated. It's worth exploring the chain of acronyms that make it up:
request
method to avoid explicitly handling Requester
objects.ReactorServer
sessions and Replier sessions. Renamed the object from QueueWaitset
to Waitset
.toJson()
methods to idl-generated C++ code.foonathan::memory
default setting that was causing crashes in examples.Initial release. Covers all basic functionality.
FetchContent
in cmake