Basic usage demo

The reader is assumed to have read the UAVCAN v1 specification beforehand. See uavcan.org for details.

Custom data types

The demo relies on two vendor-specific data types located in the root namespace sirius_cyber_corp. The root namespace directory layout is as follows:

sirius_cyber_corp/                              <-- root namespace directory
    PerformLinearLeastSquaresFit.1.0.uavcan     <-- service type definition
    PointXY.1.0.uavcan                          <-- nested message type definition

If this doesn’t look familiar, please read the UAVCAN specification first. The referenced DSDL definitions are provided below.

sirius_cyber_corp.PerformLinearLeastSquaresFit.1.0:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#
# This service accepts a dynamic array of 2D coordinates and returns the best-fit linear function coefficients.
#
# This service doesn't have a fixed subject-ID.
#

PointXY.1.0[<64] points

@assert _offset_ % 8 == {0}

---

float64 slope
float64 y_intercept

sirius_cyber_corp.PointXY.1.0:

1
2
3
4
5
6
#
# This nested type contains 2D point coordinates.
#

float16 x
float16 y

Source code

The demo relies on the custom data types presented above. In order to run the demo, please copy-paste its source code into a file on your computer and update the DSDL paths to match your environment.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/env python3
#
# A basic PyUAVCAN demo. This file is included in the user documentation, please keep it tidy.
#
# Distributed under CC0 1.0 Universal (CC0 1.0) Public Domain Dedication. To the extent possible under law, the
# UAVCAN Development Team has waived all copyright and related or neighboring rights to this work.
#

import os
import sys
import typing
import pathlib
import asyncio
import tempfile
import importlib
import pyuavcan
# Explicitly import transports and media sub-layers that we may need here.
import pyuavcan.transport.can
import pyuavcan.transport.can.media.socketcan
import pyuavcan.transport.serial
import pyuavcan.transport.udp
import pyuavcan.transport.redundant

# We will need a directory to store the generated Python packages in.
#
# It is perfectly acceptable to just use a random temp directory at every run, but the disadvantage of that approach
# is that the packages will be re-generated from scratch every time the program is started, which may be undesirable.
#
# So in this example we select a fixed temp dir name (make sure it's unique enough) and shard its contents by the
# library version. The sharding helps us ensure that we won't attempt to use a package generated for an older library
# version with a newer one, as they may be incompatible.
#
# Another sensible location for the generated package directory is somewhere in the application data directory,
# like "~/.my-app/dsdl/{pyuavcan.__version__}/"; or, for Windows: "%APPDATA%/my-app/dsdl/{pyuavcan.__version__}/".
dsdl_generated_dir = pathlib.Path(tempfile.gettempdir(), 'dsdl-for-my-program', f'pyuavcan-v{pyuavcan.__version__}')
dsdl_generated_dir.mkdir(parents=True, exist_ok=True)
print('Generated DSDL packages will be stored in:', dsdl_generated_dir, file=sys.stderr)

# We will need to import the packages once they are generated, so we should update the module import look-up path set.
# If you're using an IDE for development, add this path to its look-up set as well for code completion to work.
sys.path.insert(0, str(dsdl_generated_dir))

# Now we can import our packages. If import fails, invoke the code generator, then import again.
try:
    import sirius_cyber_corp     # This is our vendor-specific root namespace. Custom data types.
    import pyuavcan.application  # The application module requires the standard types from the root namespace "uavcan".
except (ImportError, AttributeError):
    script_path = os.path.abspath(os.path.dirname(__file__))
    # Generate our vendor-specific namespace. It may make use of the standard data types (most namespaces do,
    # because the standard root namespace contains important basic types), so we include it in the lookup path set.
    # The paths are hard-coded here for the sake of conciseness.
    pyuavcan.dsdl.generate_package(
        root_namespace_directory=os.path.join(script_path, '../dsdl/namespaces/sirius_cyber_corp/'),
        lookup_directories=[os.path.join(script_path, '../public_regulated_data_types/uavcan')],
        output_directory=dsdl_generated_dir,
    )
    # Generate the standard namespace. The order actually doesn't matter.
    pyuavcan.dsdl.generate_package(
        root_namespace_directory=os.path.join(script_path, '../public_regulated_data_types/uavcan'),
        output_directory=dsdl_generated_dir,
    )
    # Okay, we can try importing again. We need to clear the import cache first because Python's import machinery
    # requires that; see the docs for importlib.invalidate_caches() for more info.
    importlib.invalidate_caches()
    import sirius_cyber_corp
    import pyuavcan.application

# Import other namespaces we're planning to use. Nested namespaces are not auto-imported, so in order to reach,
# say, "uavcan.node.Heartbeat", you have to do "import uavcan.node".
import uavcan.node                      # noqa E402
import uavcan.diagnostic                # noqa E402
import uavcan.si.sample.temperature     # noqa E402


class DemoApplication:
    def __init__(self):
        # The interface to run the demo against is selected via the environment variable with a default option provided.
        # Virtual CAN bus is supported only on GNU/Linux, but other interfaces used here should be compatible
        # with at least Windows as well.
        # Frankly, the main reason we need this here is to simplify automatic testing of this demo script.
        # Feel free to remove the selection logic and just hard-code whatever interface you need.
        interface_kind = os.environ.get('DEMO_INTERFACE_KIND', '').lower()
        # The node-ID is configured per transport instance.
        # Some transports (e.g., UDP/IP) derive the node-ID value from the configuration of the underlying layers.
        # Other transports (e.g., CAN or serial) must be provided with the node-ID value explicitly during
        # initialization, or None can be used to select the anonymous mode.
        # Some of the protocol features are unavailable in the anonymous mode (read Specification for more info).
        # For example, anonymous node cannot be a server, since without an ID it cannot be addressed.
        # Here, we assign a node-ID statically, because this is a simplified demo.
        # Most applications would need this to be configurable, some may support the PnP node-ID allocation protocol.
        if interface_kind == 'udp' or not interface_kind:  # This is the default.
            # The UDP/IP transport in this example runs on the local loopback interface, so no setup is needed.
            # The UDP transport requires us to assign the IP address; the node-ID equals the value of several least
            # significant bits of its IP address. If you want an anonymous UDP/IPv4 node, just use the subnet's
            # broadcast address as its local IP address (e.g., 127.255.255.255/8, 192.168.0.255/24, and so on).
            # For more info, please read the API documentation.
            transport = pyuavcan.transport.udp.UDPTransport('127.0.0.42/8')

        elif interface_kind == 'serial':
            # For demo purposes we're using not an actual serial port (which could have been specified like "COM9"
            # for example) but a virtualized TCP/IP tunnel. The background is explained in the API documentation
            # for the serial transport, please read that. For a quick start, just install Ncat (part of Nmap) and run:
            #   ncat --broker --listen -p 50905
            transport = pyuavcan.transport.serial.SerialTransport('socket://localhost:50905', local_node_id=42)

        elif interface_kind == 'can':
            # Make sure to initialize the virtual CAN interface. For example (run as root):
            #   modprobe vcan
            #   ip link add dev vcan0 type vcan
            #   ip link set vcan0 mtu 72
            #   ip link set up vcan0
            # CAN interfaces can me monitored using can-utils:
            #   candump -decaxta any
            # Here we select Classic CAN by setting MTU=8 bytes. We can switch to CAN FD by simply increasing the MTU.
            media = pyuavcan.transport.can.media.socketcan.SocketCANMedia('vcan0', mtu=8)
            transport = pyuavcan.transport.can.CANTransport(media, local_node_id=42)

        elif interface_kind == 'can_can_can':
            # One of the selling points of UAVCAN is the built-in support for modular redundancy.
            # In this section, we set up a triply modular redundant (TMR) CAN bus.
            transport = pyuavcan.transport.redundant.RedundantTransport()
            # Like vcan0, this case requires vcan1 and vcan2 to be available as well.
            media_0 = pyuavcan.transport.can.media.socketcan.SocketCANMedia(f'vcan0', mtu=8)
            media_1 = pyuavcan.transport.can.media.socketcan.SocketCANMedia(f'vcan1', mtu=32)
            media_2 = pyuavcan.transport.can.media.socketcan.SocketCANMedia(f'vcan2', mtu=64)
            # All transports in a redundant group MUST share the same node-ID.
            transport.attach_inferior(pyuavcan.transport.can.CANTransport(media_0, local_node_id=42))
            transport.attach_inferior(pyuavcan.transport.can.CANTransport(media_1, local_node_id=42))
            transport.attach_inferior(pyuavcan.transport.can.CANTransport(media_2, local_node_id=42))
            assert len(transport.inferiors) == 3  # Yup, it's a triply redundant transport.

        elif interface_kind == 'udp_serial':
            # UAVCAN supports dissimilar transport redundancy for safety-critical/high-reliability systems.
            # In this example, we set up a transport that operates over UDP and serial concurrently.
            # This is just an example, however. Major advantages of dissimilar redundant architectures
            # may be observed with wired+wireless links used concurrently; see https://forum.uavcan.org/t/557.
            # All transports in a redundant group MUST share the same node-ID.
            transport = pyuavcan.transport.redundant.RedundantTransport()
            transport.attach_inferior(pyuavcan.transport.udp.UDPTransport('127.0.0.42/8'))
            transport.attach_inferior(pyuavcan.transport.serial.SerialTransport('socket://localhost:50905',
                                                                                local_node_id=42))

        else:
            raise RuntimeError(f'Unrecognized interface kind: {interface_kind}')  # pragma: no cover

        assert transport.local_node_id == 42  # Yup, the node-ID is configured.

        # Populate the node info for use with the Node class. Please see the DSDL definition of uavcan.node.GetInfo.
        node_info = uavcan.node.GetInfo_1_0.Response(
            # Version of the protocol supported by the library, and hence by our node.
            protocol_version=uavcan.node.Version_1_0(*pyuavcan.UAVCAN_SPECIFICATION_VERSION),
            # There is a similar field for hardware version, but we don't populate it because it's a software-only node.
            software_version=uavcan.node.Version_1_0(major=1, minor=0),
            # The name of the local node. Should be a reversed Internet domain name, like a Java package.
            name='org.uavcan.pyuavcan.demo.basic_usage',
            # We've left the optional fields default-initialized here.
        )

        # The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
        presentation = pyuavcan.presentation.Presentation(transport)

        # The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
        # heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
        self._node = pyuavcan.application.Node(presentation, node_info)

        # Published heartbeat fields can be configured trivially by assigning them on the heartbeat publisher instance.
        self._node.heartbeat_publisher.mode = uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL
        # In this example here we assign the local process' PID to the vendor-specific status code (VSSC) and make
        # sure that the valid range is not exceeded.
        self._node.heartbeat_publisher.vendor_specific_status_code = \
            os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(uavcan.node.Heartbeat_1_0)[
                'vendor_specific_status_code'].data_type.bit_length_set) - 1)

        # Now we can create our session objects as necessary. They can be created or destroyed later at any point
        # after initialization. It's not necessary to set everything up during the initialization.
        srv_least_squares = self._node.presentation.get_server(sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0, 123)
        # Will run until self._node is close()d:
        srv_least_squares.serve_in_background(self._serve_linear_least_squares_request)

        # Create another server using shorthand for fixed port ID. We could also use it with an application-specific
        # service-ID as well, of course:
        #   get_server(uavcan.node.ExecuteCommand_1_0, 42).serve_in_background(self._serve_execute_command)
        # If the transport does not yet have a node-ID, the server will stay idle until a node-ID is assigned
        # because the node won't be able to receive unicast transfers carrying service requests.
        self._node.presentation.get_server_with_fixed_service_id(
            uavcan.node.ExecuteCommand_1_0
        ).serve_in_background(self._serve_execute_command)

        # We'll be publishing diagnostic messages using this publisher instance. The method we use is a shortcut for:
        #   make_publisher(uavcan.diagnostic.Record_1_0, pyuavcan.dsdl.get_fixed_port_id(uavcan.diagnostic.Record_1_0))
        self._pub_diagnostic_record = \
            self._node.presentation.make_publisher_with_fixed_subject_id(uavcan.diagnostic.Record_1_0)
        self._pub_diagnostic_record.priority = pyuavcan.transport.Priority.OPTIONAL
        self._pub_diagnostic_record.send_timeout = 2.0

        # A message subscription.
        self._sub_temperature = self._node.presentation.make_subscriber(uavcan.si.sample.temperature.Scalar_1_0, 12345)
        self._sub_temperature.receive_in_background(self._handle_temperature)

        # When all is initialized, don't forget to start the node!
        self._node.start()

    async def _serve_linear_least_squares_request(self,
                                                  request:  sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0.Request,
                                                  metadata: pyuavcan.presentation.ServiceRequestMetadata) \
            -> typing.Optional[sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0.Response]:
        """
        This is the request handler for the linear least squares service. The request is passed in along with its
        metadata (the second argument); the response is returned back. We can also return None to instruct the library
        that this request need not be answered (as if the request was never received).
        If this handler raises an exception, it will be suppressed and logged, and no response will be sent back.
        Notice that this is an async function.
        """
        # Publish the message asynchronously using publish_soon() because we don't want to block the service handler.
        diagnostic_msg = uavcan.diagnostic.Record_1_0(
            severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.DEBUG),
            text=f'Least squares request from {metadata.client_node_id} time={metadata.timestamp.system} '
                 f'tid={metadata.transfer_id} prio={metadata.priority}',
        )
        print('Least squares request:', request, file=sys.stderr)
        self._pub_diagnostic_record.publish_soon(diagnostic_msg)

        # This is just the business logic.
        sum_x = sum(map(lambda p: p.x, request.points))
        sum_y = sum(map(lambda p: p.y, request.points))
        a = sum_x * sum_y - len(request.points) * sum(map(lambda p: p.x * p.y, request.points))
        b = sum_x * sum_x - len(request.points) * sum(map(lambda p: p.x ** 2, request.points))
        try:
            slope = a / b
            y_intercept = (sum_y - slope * sum_x) / len(request.points)
        except ZeroDivisionError:
            # The method "publish_soon()" launches a background task instead of waiting for the operation to complete.
            self._pub_diagnostic_record.publish_soon(uavcan.diagnostic.Record_1_0(
                severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.WARNING),
                text=f'There is no solution for input set: {request.points}',
            ))
            # We return None, no response will be sent back. This practice is actually discouraged; we do it here
            # only to demonstrate the library capabilities.
            return None
        else:
            self._pub_diagnostic_record.publish_soon(uavcan.diagnostic.Record_1_0(
                severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.INFO),
                text=f'Solution for {",".join(f"({p.x},{p.y})" for p in request.points)}: {slope}, {y_intercept}',
            ))
            return sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0.Response(slope=slope, y_intercept=y_intercept)

    async def _serve_execute_command(self,
                                     request:  uavcan.node.ExecuteCommand_1_0.Request,
                                     metadata: pyuavcan.presentation.ServiceRequestMetadata) \
            -> uavcan.node.ExecuteCommand_1_0.Response:
        """
        This is another service handler, like the other one.
        """
        print(f'EXECUTE COMMAND REQUEST {request} (with metadata {metadata})')

        if request.command == uavcan.node.ExecuteCommand_1_0.Request.COMMAND_POWER_OFF:
            async def do_delayed_shutdown() -> None:
                await asyncio.sleep(1.0)
                # This will close the underlying presentation, transport, and media layer resources.
                # All pending tasks such as serve_in_background() will notice this and exit automatically.
                # This is convenient as it relieves the application from having to keep track of all objects.
                self._node.close()

            asyncio.ensure_future(do_delayed_shutdown())  # Delay shutdown to let the transport emit the response.
            return uavcan.node.ExecuteCommand_1_0.Response(uavcan.node.ExecuteCommand_1_0.Response.STATUS_SUCCESS)

        elif request.command == 23456:
            # This is a custom application-specific command. Just print the string parameter and do nothing.
            parameter_text = request.parameter.tobytes().decode(errors='replace')
            print('CUSTOM COMMAND PARAMETER:', parameter_text)
            return uavcan.node.ExecuteCommand_1_0.Response(uavcan.node.ExecuteCommand_1_0.Response.STATUS_SUCCESS)

        else:
            # Command not supported.
            return uavcan.node.ExecuteCommand_1_0.Response(uavcan.node.ExecuteCommand_1_0.Response.STATUS_BAD_COMMAND)

    async def _handle_temperature(self,
                                  msg:      uavcan.si.sample.temperature.Scalar_1_0,
                                  metadata: pyuavcan.transport.TransferFrom) -> None:
        """
        A subscription message handler. This is also an async function, so we can block inside if necessary.
        The received message object is passed in along with the information about the transfer that delivered it.
        """
        print('TEMPERATURE', msg.kelvin - 273.15, 'C')

        # Publish the message synchronously, using await, blocking this task until the message is pushed down to
        # the media layer.
        if not await self._pub_diagnostic_record.publish(uavcan.diagnostic.Record_1_0(
            severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.TRACE),
            text=f'Temperature {msg.kelvin:0.3f} K from {metadata.source_node_id} '
                 f'time={metadata.timestamp.system} tid={metadata.transfer_id} prio={metadata.priority}',
        )):
            print('Diagnostic message could not be sent in', self._pub_diagnostic_record.send_timeout, 'seconds',
                  file=sys.stderr)


if __name__ == '__main__':
    app = DemoApplication()
    app_tasks = asyncio.Task.all_tasks()

    async def list_tasks_periodically() -> None:
        """Print active tasks periodically for demo purposes."""
        import re

        def repr_task(t: asyncio.Task) -> str:
            try:
                out, = re.findall(r'^<([^<]+<[^>]+>)', str(t))
            except ValueError:
                out = str(t)
            return out

        while True:
            print('\nActive tasks:\n' + '\n'.join(map(repr_task, asyncio.Task.all_tasks())), file=sys.stderr)
            await asyncio.sleep(10)

    asyncio.get_event_loop().create_task(list_tasks_periodically())

    # The node and PyUAVCAN objects have created internal tasks, which we need to run now.
    # In this case we want to automatically stop and exit when no tasks are left to run.
    asyncio.get_event_loop().run_until_complete(asyncio.gather(*app_tasks))

Evaluating the demo using the command-line tool

Generating data type packages from DSDL

First, we need to make sure that the required DSDL-generated packages are available for the command-line tool. Suppose that the application-specific data types listed above are located at ../dsdl/namespaces/, and that instead of using a local copy of the public regulated data types we prefer to download them from the repository. This is the command:

uvc dsdl-gen-pkg ../dsdl/namespaces/sirius_cyber_corp/ https://github.com/UAVCAN/public_regulated_data_types/archive/master.zip

That’s it. The DSDL-generated packages have been stored in the current working directory, so now we can use them. If you decided to change the working directory, please make sure to update the PYTHONPATH environment variable to include the path where the generated packages are stored, otherwise you won’t be able to import them. Alternatively, you can just move the generated packages to a new location (they are location-invariant) or just generate them anew where needed.

If you want to know what exactly has been done, rerun the command with -v (V for Verbose). As always, use --help to get the full usage information.

Configuring the transport

The commands shown later have to be instructed to use the same transport interface as the demo. Please use one of the following options depending on your demo configuration:

  • --tr="UDP('127.0.0.111/8')" – UDP/IP transport on localhost. Local node-ID 111.

  • --tr="Serial('socket://loopback:50905',111)" – serial transport emulated over a TCP/IP tunnel instead of a real serial port (use Ncat for TCP connection brokering). Local node-ID 111.

  • --tr="CAN(can.media.socketcan.SocketCANMedia('vcan0',8),111)" – virtual CAN bus via SocketCAN (GNU/Linux systems only). Local node-ID 111.

Redundant transports can be configured by specifying the --tr option more than once:

  • --tr="UDP('127.0.0.111/8')" --tr="Serial('socket://loopback:50905',111)" – dissimilar double redundancy, UDP plus serial.

  • --tr="CAN(can.media.socketcan.SocketCANMedia('vcan0',8),111)" --tr="CAN(can.media.socketcan.SocketCANMedia('vcan1',32),111)" --tr="CAN(can.media.socketcan.SocketCANMedia('vcan2',64),111)" – triple redundant CAN bus, classic CAN with CAN FD.

Running the application

Start the demo application shown above and leave it running. In a new terminal, run the following commands to listen to the demo’s heartbeat or its diagnostics (don’t forget to specify transport):

uvc sub uavcan.node.Heartbeat.1.0 --with-metadata --count=3
uvc sub uavcan.diagnostic.Record.1.0 --with-metadata

The latter may not output anything because the demo application is not doing anything interesting, so it has nothing to report. Keep the command running, and open a yet another terminal, whereat run this:

uvc call 42 123.sirius_cyber_corp.PerformLinearLeastSquaresFit.1.0 '{points: [{x: 10, y: 1}, {x: 20, y: 2}]}'

Once you’ve executed the last command, you should see a diagnostic message being emitted in the other terminal. Now let’s publish temperature:

uvc pub 12345.uavcan.si.sample.temperature.Scalar.1.0 '{kelvin: 123.456}' --count=2

You will see the demo application emit two more diagnostic messages.