Protection class: a solution resolving I2C communication issues in complex systems

Author: Vid Rajtmajer, student intern at IRNAS

Introduction

It is worth mentioning that the failures we are talking about here are something random, not-easily-reproducible and beyond systematic testing and validation that are a part of our standard practice. In this blog, we will describe our way of solving one of the failures that we’ve encountered on an irregular basis related to the I2C communication. A practice we have adopted is to break the system down into standalone modules and isolate their functionality, such that a single faulty component does not break the complete system.

The Basic Software

  1. Drivers
  2. Wrappers
  3. Main Class

We won’t go into too much detail here, but nevertheless, let’s just classify all the layers we are talking about here. First, the drivers are communicating with the motors and sensor through I2C, so inside their class, there are all the functions needed to read and write bytes to registers and perform basic parsing of results (from bytes to numbers and similar).

Wrappers are the middle layer. They have application-specific functions and they communicate with the drivers. We used them to separate application actions from device functions on one side and from the main code on the other side. They are there also to prevent the main code from calling the drivers directly.

The top layer is the main class, which actually does what the system is supposed to do: it initialises everything, runs desired procedures, write logs, etc. Its work is split into threads.

The I2C Failure

To tackle I2C communication stability, we’ve created a class which we called the Protection class.

The Protection Class

The protection class is a singleton with attributes switch, line and address.

def __init__(switch, line, address):
"""Init the class."""
Protection.switch = switch # switch wrapper class
Protection.line = line # on which switch line the wrapper class is
Protection.address = address # on which i2c address the device is

All function calls from the wrapper go into protection and are forwarded to drivers. It works like this:

  1. Wrapper wants to call a function from a driver, the call gets forwarded to the protection class which calls the desired drivers’ function,
  2. Driver returns the result to the wrapper, depending on what happened to the desired write or read operation (successful — including actual device data if any, unsuccessful or it timed out),
  3. Protection class then decides how to proceed. If the operation was successful, it returns the result (including data) to the wrapper class and terminates itself. In any other case, it proceeds with resolving.

After each resolving step, it goes back to point 2 to check if the issue was resolved. The resolving steps are the following:

  • First it performs a re-try for user-configurable number of times
# 1. retry a few times
logging.info("Retrying the call...")
for _ in range(RETRY_COUNT):
#print(_)
fun_ret = Protection.perform_function(function, args)
if fun_ret is not None: # call went ok, return result of actual function
logging.info("Retry call went ok, returning actual result.")
return fun_ret
time.sleep(0.05)
  • perform the I2C line test

The I2C line test checks all the I2C lines in the MUX/switch. The MUX is the only device that the main controller/processor has direct I2C connection with. All other devices are connected to this switch, each one on its own I2C line. MUX driver in this case is in a separate switch class.

This enables us to do the following steps in resolving the communication problem:

  • Try to disable the line that the wrapper class wants to use
  • Re-enable the line it
  • Check if the device appears on I2C
  • If it doesn’t, disable it and return error to wrapper class
  • If it does, retry the function call
  • If function call was successful, return an actual result and restore the original state of all switch lines
  • If function call was not successful, return error to wrapper class
channels_state = Protection.switch.get_all_channels()  # get current states of all switch lines
if channels_state is not None:
# try to disable the line that wrapper class uses
ret_val = Protection.switch.set_channel(Protection.line, 0)
if not ret_val:
logging.error("Switch has failed to disable line {}.".format(Protection.line))
# re enable the line that wrapper class uses
ret_val = Protection.switch.set_channel(Protection.line, 1)
if not ret_val: # break
logging.error("Switch has failed to enable line {}.".format(Protection.line))
else: # continue
# check if device appears on i2c
active = Protection.switch.get_active_i2c_ports()
if not active or Protection.address not in active: # break
logging.error("Device did not appear on i2c bus.")
else: # continue
# retry the function call
fun_ret = Protection.perform_function(function, args)
if fun_ret is not None: # call went ok
logging.info("I2c test retry call went ok, returning actual result.")
# restore states of all switch lines
ret_val = Protection.switch.set_all_channels(channels_state)
if ret_val: # restore went ok
return fun_ret # return result of actual function
else: # restore failed
logging.error("Failed to restore states of all switch lines.")
else:
logging.error("Failed to communicate with switch driver...")

Conclusion

This approach has proven to be a simple yet effective in tackling I2C communication problems. We are proud to say we haven’t had an I2C-related issue reported to this day after putting this class into production firmware versions.

We are applying today’s knowledge to create systems for an open future.