Monitor if your systems are up to date with Datadog
03 March 2023
Updating your systems, especially those that are constantly connected to the Internet, is very important from a security standpoint. Currently, state of the art, cloud-native solution is to use containers and orchestrators such as Docker and Kubernetes. There's also an option to use virtual machines and images, made with Packer and deployed with Terraform. However, when you inherit multiple legacy systems, configured by hand, serving the customers and tightly coupled, without integration or end-to-end tests, things get more complicated. To ensure that we patch our systems often enough, we need to have a list with their state. At first I used a spreadsheet created with Ansible but that proved to be ineffective due to need of running it manually. Much better solution is to use a monitoring service such as Datadog. In this post, I will explain how to create a custom Datadog check in Python.
Repository for this post is available at: https://github.com/ppabis/datadog-system-state.
Boilerplate for a Datadog check
Let's start with a simple file named SystemState.py and put the following
contents:
import os, shutil, socket
from datetime import datetime
from datadog_checks.base import AgentCheck
from datadog_checks.base.utils.subprocess_output import get_subprocess_output
class SystemState(AgentCheck):
    def check(self, instance):
        pass
This simple script will not report anything so far. It imports the necessary
libraries for Datadog integration (namely AgentCheck class) and some other
utilities we will use later to implement our functions. class SystemState
extends basic AgentCheck and overrides its check method that will be
periodically run by Datadog Agent. self in this context contains global check
configuration and all the methods to interact with Datadog. instance contains
just configuration for current instance of the check - more on that later on.
To test the check locally and have code suggestions, we can install Datadog
libraries using pip or by following official guide from Datadog using
(pipx). For the pip method
you can install two packages (under venv is recommended but not required):
$ pip install datadog-checks-base
$ pip install datadog-checks-dev
Implementing functions for our check
We plan to report the following information:
- currently upgradable packages,
- days since last reboot,
- major version of the OS.
Currently upgradable packages
To detect if there are any packages that can be upgraded, on Debian-based
systems we can use apt command. We first list the packages that can be
upgraded and then count the number of lines in the output. In bash we do it like
this:
$ apt -qq list --upgradable 2>/dev/null | wc -l
We query apt cache, use -qq to not print extra Listing... line, redirect
stderr to null to ignore any warnings and count the lines with wc -l. On
RedHat-based systems, we can use dnf command:
$ dnf -q check-update 2>/dev/null | grep -v ^$ | wc -l
Here we need to add also grep -v ^$ to remove empty lines from the output as
dnf adds an empty line at the top of the output (as of RockyLinux 8.7). You
should test how those commands work on your system beforehand so that the check
is implemented correctly.
So let's convert this to Python now. We will use get_subprocess_output
provided by Datadog to run the command and get the output. We will use Python
built-in functions to do all the needed transformations. To detect the available
package manager, we use shutil.which that returns the path to the executable
if it exists, otherwise it returns None. The function will be a new method in
SystemState class.
class SystemState(AgentCheck):
    def get_upgradable_packages(self):
        if shutil.which("apt") is not None:
            cmd = [ "apt", "-qq", "list", "--upgradable" ] # Debian
        elif shutil.which("dnf") is not None:
            cmd = [ "dnf", "-q", "check-update" ] # RedHat
        else:
            return -1 # Not supported
        out, _, _ = get_subprocess_output( cmd, self.log, raise_on_empty_output=False )
        return len(     # Count the lines (wc -l)
         list(          # Convert to list
          filter(       # Filter out empty lines (grep -v ^$)
           lambda x: len(x) > 0, out.splitlines()
          )
         )
        )
Days since last reboot
Rebooting is a common routine for cloud-native solutions such as Kubernetes Pods - they can be replaced multiple times a day and microservices are designed to be loosely coupled. However, in case of firmly installed software, this is often not the case.
To get the boot date and time we can use who -b or uptime command. Output of
this command can be then parsed by datetime and the difference between then
and now will be the number of days that passed since then. uptime returns
directly the number of days (if larger than 1). However, we will use uptime -s
in this example as it works cross-platform,
formatted independent of locale,
and even in Docker (although you need to install procps or procps-ng package
first). Because of known and predictable date format reported by uptime, we can easily
parse it in Python and subtract it from the current date.
class SystemState(AgentCheck):
    def get_days_since_last_reboot(self):
        out, _, _ = get_subprocess_output( [ "uptime", "-s" ], self.log )
        boot_time = datetime.strptime( out.replace("\n", ""), "%Y-%m-%d %H:%M:%S" )
        return ( datetime.now() - boot_time ).days
Major version of the OS
To get the major version of the OS we can use lsb_release command if it's
available. Otherwise we will use check if /etc/lsb-release or
/etc/os-release file exists and parse it.
class SystemState(AgentCheck):
    def get_os_major_version(self):
        version = "-1"
        # Using lsb_release binary if present
        if shutil.which( "lsb_release" ) is not None:
            out, _, _ = get_subprocess_output( [ "lsb_release", "-rs" ], self.log )
            version = out
        # Using /etc/lsb-release file if present
        elif os.path.isfile( "/etc/lsb-release" ):
            with open( "/etc/lsb-release", "r" ) as f:
                for line in f:
                    if line.startswith( "DISTRIB_RELEASE=" ):
                        version = line.split("=")[1]
                        break
        # Using /etc/os-release file if present
        elif os.path.isfile( "/etc/os-release" ):
            with open( "/etc/os-release", "r" ) as f:
                for line in f:
                    if line.startswith( "VERSION_ID=" ):
                        version = line.split("=")[1]
                        break
        return int( version.strip('"').split(".")[0] )
Putting it together
So now we are able to use each of the functions we implemented to report the
data we want. We will use self.gauge to report the data. Within the instance
we can add custom parameters such as tags - they will be configured later on
in the YAML file.
import os, shutil
from datetime import datetime
from datadog_checks.base import AgentCheck
from datadog_checks.base.utils.subprocess_output import get_subprocess_output
__version__ = "0.1.0"
class SystemState(AgentCheck):
    #
    # [...] functions from above
    #
    def check(self, instance):
        tags = instance.get('tags', [])
        tags.append(f"hostname:{socket.gethostname()}") # Report hostname as tag
        self.gauge( 'systemstate.upgradable_packages', self.get_upgradable_packages(), tags=tags )
        self.gauge( 'systemstate.days_since_last_reboot', self.get_days_since_last_reboot(), tags=tags )
        self.gauge( 'systemstate.os_major_version', self.get_os_major_version(), tags=tags )
The data will be sent to Datadog under names starting with systemstate.
Using the check
To use the check with Datadog Agent, you have to place it in
/etc/datadog-agent/checks.d directory and restart the service. Also, each
check requires a minimal configuration file in /etc/datadog-agent/conf.d/. It
can be either a single YAML file SystemState.yaml or a directory
SystemState.d that contains more YAML files. Both should be named the same as
the check script. Let's use a single file for this example. In
SystemState.yaml put the following:
init_config:
instances:
  - min_collection_interval: 3600
    tags:
      - env:prod
      - os:debian
instances takes a list of dictionaries. tags is our custom argument
mentioned earlier. min_collection_interval is predefined by Datadog. The check
will be run every hour (3600 seconds) and for each metric it will append the
tags listed for this instance.
Change ownership of those files to dd-agent and restart Datadog Agent for the
changes to take effect.
$ sudo chown dd-agent:dd-agent /etc/datadog-agent/checks.d/SystemState.py
$ sudo chown dd-agent:dd-agent /etc/datadog-agent/conf.d/SystemState.yaml
$ sudo systemctl restart datadog-agent
After a while, in Datadog you should see the new metrics. Create monitors to show red alerts or yellow warnings when the values deviate from desired state, for example number of outdated packages is larger than 20 or major release of Debian is older than 10. That way, even if manual, it will automatically keep track of all the servers in your infrastructure. For example use this Terraform resource to create a monitor:
resource "datadog_monitor" "upgradable-packages" {
  type = "query alert"
  name = "SystemState - Upgradable packages on Production {{ hostname.name }}"
  require_full_window = false
  query = "avg(last_1h):avg:systemstate.upgradable_packages{env:prod} by {hostname} > 20"
  require_full_window = false
  monitor_thresholds {
    critical = 20
    warning = 10
  }
  message = <<-EOT
    {{#is_alert}}
    More than 20 packages are upgradable on {{ hostname.name }}!
    {{/is_alert}}
    {{#is_warning}}
    More than 10 packages are upgradable on {{ hostname.name }}!
    {{/is_warning}}
    {{#is_recovery}}
    {{ hostname.name }} has permissible amount of upgradable packages.
    {{/is_recovery}}
    EOT
}
In the next post we will explore how to report the same metrics to CloudWatch.