File: //sbin/imunify360-watchdog
#!/opt/imunify360/venv/bin/python3
import json
import logging
import logging.handlers
import os.path
import shutil
import socket
import subprocess
import sys
import time
from typing import Optional
from defence360agent import sentry
logging.raiseExceptions = False
CONNECT_TIMEOUT = 10
LICENSE = '/var/imunify360/license.json'
REQUEST_TIMEOUT = 60
RETRY_DELAY = 10
MIGRATION_TIMEOUT = 4 * 60 * 60 # 4 hours
IMUNIFY360 = 'imunify360'
SOCKET_PATH = '/var/run/defence360agent/simple_rpc.sock'
SERVICE = 'service'
SUBPROCESS_TIMEOUT = 1800
RESTART = 'restart'
STATUS = 'status'
SHOW = 'show'
AGENT_IN_MIGRATION_STATE = 'Applying database migrations'
def run(cmd, *, timeout=SUBPROCESS_TIMEOUT, check=False, **kwargs):
"""Run *cmd* with *timeout* without raising TimeoutExpired.
On timeout, return CompletedProcess with returncode equal to None.
"""
try:
return subprocess.run(cmd, timeout=timeout, check=check, **kwargs)
except subprocess.TimeoutExpired as e:
return subprocess.CompletedProcess(
e.cmd, returncode=None, stdout=e.stdout, stderr=e.stderr
)
def service_is_running(systemctl_exec: Optional[str], name: str) -> bool:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, STATUS, name]
else:
cmd = [SERVICE, name, STATUS]
cp = run(cmd, stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return cp.returncode == 0
def restart_service(systemctl_exec: Optional[str], name: str) -> None:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, RESTART, name]
else:
cmd = [SERVICE, name, RESTART]
run(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def setup_logging(level) -> logging.Logger:
logger = logging.getLogger('imunify360-watchdog')
logger.setLevel(level)
handler = logging.handlers.SysLogHandler('/dev/log')
formatter = logging.Formatter('%(name)s: %(message)s')
handler.formatter = formatter
logger.addHandler(handler)
sentry.configure_sentry()
return logger
def rpc_request(*args, **kwargs):
r = send_to_agent_socket(list(args), kwargs)
if r['result'] != 'success':
raise ValueError(r.get('messages', 'Unknown error'))
return r.get('data')
def send_to_agent_socket(command: list, params: dict):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(CONNECT_TIMEOUT)
sock.connect(SOCKET_PATH)
msg = json.dumps({'command': command, 'params': params}) + '\n'
start_time = time.monotonic()
sock.settimeout(REQUEST_TIMEOUT)
sock.sendall(msg.encode())
remaining_time = start_time + REQUEST_TIMEOUT - time.monotonic()
if remaining_time <= 0:
raise socket.timeout()
sock.settimeout(remaining_time)
with sock.makefile(encoding='utf-8') as file:
response = file.readline()
if not response:
raise ValueError('Empty response from socket')
return json.loads(response)
def rpc_request_with_retries(rpc_timeout: int) -> Optional[dict]:
start = time.time()
while True:
try:
return rpc_request('health')
except Exception:
if time.time() - start >= rpc_timeout:
raise
time.sleep(RETRY_DELAY)
def systemctl_executable() -> Optional[str]:
"""Try to find systemctl in default PATH and return None if failed."""
return shutil.which('systemctl', path=os.defpath)
def service_is_migrating(systemctl_exec, name, logger):
"""
Check that service in "apply migrations" state and do not exhaust timeout
"""
if systemctl_exec:
cmd = [systemctl_exec, SHOW, name, '-p', 'StatusText',
'-p', 'ExecMainStartTimestampMonotonic']
else:
cmd = [SERVICE, name, SHOW]
cp = run(cmd, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# Parse only main parameters from output, other lines ignored
params = {
key: value for (key, value)
in [key_value.split('=', 1) for key_value in
cp.stdout.decode().splitlines()]
if key in ["StatusText", "ExecMainStartTimestampMonotonic"]
}
if AGENT_IN_MIGRATION_STATE in params["StatusText"]:
migration_duration = (time.monotonic()
- int(params["ExecMainStartTimestampMonotonic"])
/ 1e6)
logger.info("%s migrating for %d sec", name, migration_duration)
if migration_duration < MIGRATION_TIMEOUT:
return True
logger.error("Migration took too long")
return False
def main(rpc_timeout, log_level=logging.INFO):
logger = setup_logging(log_level)
systemctl_exec = systemctl_executable()
if not service_is_running(systemctl_exec, IMUNIFY360):
logger.info('%s is not running', IMUNIFY360)
return
elif service_is_migrating(systemctl_exec, IMUNIFY360, logger):
return
try:
response = rpc_request_with_retries(rpc_timeout)
except Exception:
logger.exception('Restarting due to RPC failures')
restart_service(systemctl_exec, IMUNIFY360)
return
if not response.get('healthy', False):
logger.error('Restarting due to health report: %s',
response.get('why'))
restart_service(systemctl_exec, IMUNIFY360)
else:
logger.info('%s is healthy: %s', IMUNIFY360, response.get('why'))
if __name__ == '__main__':
rpc_timeout = int(sys.argv[1])
main(rpc_timeout)