#!/usr/bin/env python # # Copyright 2025 David Vazgenovich Shakaryan import collections import copy import io import ipaddress import pathlib import os import re import shutil import sys import tomllib def deep_merge(d, src): for k, v in src.items(): if isinstance(v, dict) and (dv := d.get(k)): deep_merge(dv, v) else: d[k] = v return d def peer_ip_interfaces(network, id_, version=None): return [ ipaddress.ip_interface( f'{x[int(id_) if x.version == 4 else int(str(id_), 16)]}' f'/{x.prefixlen}') for x in config['net'][network] if not version or x.version == int(version)] # given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 # {peer4/24} = 10.0.0.0/24 # {peer4/28} = 10.0.0.16/28 # {peer4} = 10.0.0.20/32 # {peer6/64} = fc00:ff:ff:dead::/64 # {peer6/96} = fc00:ff:ff:dead:beef:a1::/96 # {peer6} = fc00:ff:ff:dead:beef:a1:b2:c3/128 # {peer} = 10.0.0.20/32, fc00:ff:ff:dead:beef:a1:b2:c3/128 # # a subnet size of '-', e.g. {peer/-}, will take the subnet sizes from the # network configuration. # # by default, this returns network addresses with host bits removed. # passing interface=True will maintain the host bits. def ipspec_to_ips(network, ipspec, peer, interface=False): if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)): return [ipspec] version = m[1] subnet = m[2] f = (ipaddress.ip_interface if interface else lambda x: ipaddress.ip_network(x, False)) return [ str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}')) for x in peer_ip_interfaces(network, peer['id'], version)] def ipspecs_to_ips(network, ipspecs, peer, interface=False): return [ip for ipspec in ipspecs for ip in ipspec_to_ips(net_name, ipspec, peer, interface=interface)] def expand_auto_peer(net_name, local_name): clients = list(dict.fromkeys( peer_name for peer_name in config['peer'] for peer_if_conf in peer_conf(net_name, peer_name)['if'].values() for peer_if_peer in peer_if_conf['peers'] if peer_if_peer.get('name') == local_name)) return [{'name': x, 'ips': ['{peer}']} for x in clients] def gc_if_wgquick_add_peer(buf, net_name, peerspec): peer = peer_conf(net_name, peerspec['name']) buf.write( '\n' f'# {peer['name']}\n' '[Peer]\n' f'PublicKey = {peer['pubkey']}\n') for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): buf.write(f'AllowedIPs = {ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint = {host}:{port}\n') def gc_if_wgquick(local, net_name, if_conf): cm = collections.ChainMap(if_conf, local) buf = io.StringIO() privkey = config['privkey'].get(net_name, 'FIXME') buf.write( '[Interface]\n' f'PrivateKey = {privkey}\n') for addr in ipspecs_to_ips( net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): buf.write(f'Address = {addr}\n') if (port := cm.get('port')) != 'auto': buf.write(f'ListenPort = {port}\n') if (fwmark := cm.get('fwmark')): buf.write(f'FwMark = {fwmark}\n') for peerspec in if_conf['peers']: if peerspec.get('auto'): for auto_spec in expand_auto_peer(net_name, local['name']): gc_if_wgquick_add_peer(buf, net_name, auto_spec) else: gc_if_wgquick_add_peer(buf, net_name, peerspec) return buf def gc_if_systemd_network(local, net_name, netif_name): buf = io.StringIO() buf.write( '[Match]\n' f'Name={netif_name}\n' '\n' '[Network]\n' 'IPMasquerade=both\n') for addr in ipspecs_to_ips( net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): buf.write(f'Address={addr}\n') return buf def gc_if_systemd_netdev_add_peer(buf, net_name, peerspec): peer = peer_conf(net_name, peerspec['name']) buf.write( '\n' f'# {peer['name']}\n' '[WireGuardPeer]\n' f'PublicKey={peer['pubkey']}\n') for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): buf.write(f'AllowedIPs={ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint={host}:{port}\n') def gc_if_systemd_netdev(local, net_name, if_conf, netif_name): cm = collections.ChainMap(if_conf, local) buf = io.StringIO() privkey = config['privkey'].get(net_name, 'FIXME') buf.write( '[NetDev]\n' f'Name={netif_name}\n' 'Kind=wireguard\n' f'Description=WireGuard tunnel {netif_name}\n' '\n' '[WireGuard]\n' f'PrivateKey={privkey}\n') if (port := cm.get('port')) != 'auto': buf.write(f'ListenPort={port}\n') if (fwmark := cm.get('fwmark')): buf.write(f'FirewallMark={fwmark}\n') for peerspec in if_conf['peers']: if peerspec.get('auto'): for auto_spec in expand_auto_peer(net_name, local['name']): gc_if_systemd_netdev_add_peer(buf, net_name, auto_spec) else: gc_if_systemd_netdev_add_peer(buf, net_name, peerspec) return buf def buf_to_file(buf, path, mode=None): print(f'Creating file {path}') if mode: path = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=mode) with open(path, 'w') as f: buf.seek(0) shutil.copyfileobj(buf, f) def create_if_files(local, net_name, if_name, if_conf): cm = collections.ChainMap(if_conf, local) netif_name = f'{cm.get('prefix', '')}{net_name}' if if_name: netif_name += f'-{if_name}' file_prefix = f'out/{cm.get('file-prefix', '')}' if if_conf.get('type') == 'systemd': buf_to_file( gc_if_systemd_netdev(local, net_name, if_conf, netif_name), f'{file_prefix}{netif_name}.netdev', mode=0o640) buf_to_file( gc_if_systemd_network(local, net_name, netif_name), f'{file_prefix}{netif_name}.network') else: buf_to_file( gc_if_wgquick(local, net_name, if_conf), f'{file_prefix}{netif_name}.conf') def peer_conf(net_name, peer_name): if not (peer := config['peer'].get(peer_name)): return if (nets := peer.get('net')) and (net := nets.get(net_name)): return net return peer['default'] def process_config_networks(raw_config): return { k: [ipaddress.ip_network(subnet) for subnet in v['subnets']] for k, v in raw_config['net'].items()} def process_config_peer_net(raw_config, peer_conf): peer_conf.setdefault('if', {}) if (ifsets := peer_conf.get('ifsets')): for ifset_name in ifsets: for if_name, if_conf in ( raw_config['ifset'][ifset_name]['if'].items()): peer_conf['if'].setdefault(if_name, copy.deepcopy(if_conf)) # remove self from if peers ifs = peer_conf['if'] for if_name, if_conf in list(ifs.items()): filt_peers = [ x for x in if_conf['peers'] if x.get('name') != peer_conf['name']] if filt_peers: if_conf['peers'] = filt_peers else: del ifs[if_name] return peer_conf def process_config_peers(raw_config): peers = {} for peer_name, peer_conf in copy.deepcopy(raw_config['peer']).items(): peer_conf['name'] = peer_name peer_conf_net = peer_conf.pop('net', {}) nets = { net_name: process_config_peer_net( raw_config, deep_merge(copy.deepcopy(peer_conf), net_conf)) for net_name, net_conf in peer_conf_net.items()} # do this last because it mutates peer_conf default = process_config_peer_net(raw_config, peer_conf) peers[peer_name] = { 'default': default, 'net': nets } return peers def process_config(raw_config): return { 'net': process_config_networks(raw_config), 'peer': process_config_peers(raw_config), } def get_privkeys(): privkeys = {} for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'): with open(path, 'r') as f: privkeys[path.stem] = f.read().rstrip() return privkeys conf_file = sys.argv[1] local_name = sys.argv[2] with open(conf_file, 'rb') as f: raw_config = tomllib.load(f) config = process_config(raw_config) config['privkey'] = get_privkeys() if not (local_peer := config['peer'].get(local_name)): sys.exit(f"peer '{local_name}' not configured") for net_name in local_peer['net']: local = peer_conf(net_name, local_name) for if_name, if_conf in local['if'].items(): # file=false allows associating peers without creating a config file, # useful when an auto-peer interface is being separately created if if_conf.get('file', True): create_if_files(local, net_name, if_name, if_conf)