diff options
| author | David Vazgenovich Shakaryan <dvshakaryan@gmail.com> | 2025-12-05 20:29:28 -0800 |
|---|---|---|
| committer | David Vazgenovich Shakaryan <dvshakaryan@gmail.com> | 2025-12-05 20:29:28 -0800 |
| commit | dc0b6a8ae98f8d830bc2a3ff385e3ff256a0a7f9 (patch) | |
| tree | baba3c11ff8a10d291a31041403ebfb55059a0ed | |
| download | wg-genconf-master.tar.gz wg-genconf-master.tar.xz | |
| -rwxr-xr-x | wg.py | 278 |
1 files changed, 278 insertions, 0 deletions
@@ -0,0 +1,278 @@ +#!/usr/bin/env python +# +# Copyright 2025 David Vazgenovich Shakaryan + +import copy +import io +import ipaddress +import pathlib +import os +import re +import shutil +import sys +import tomllib + +def deep_merge(d, src): + for k, v in src.items(): + if isinstance(v, dict) and (dv := d.get(k)): + deep_merge(dv, v) + else: + d[k] = v + return d + +def peer_ip_interfaces(network, id_, version=None): + return [ + ipaddress.ip_interface( + f'{x[int(id_) if x.version == 4 else int(str(id_), 16)]}' + f'/{x.prefixlen}') + for x in config['net'][network] + if not version or x.version == int(version)] + +# given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 +# {peer4/24} = 10.0.0.0/24 +# {peer4/28} = 10.0.0.16/28 +# {peer4} = 10.0.0.20/32 +# {peer6/64} = fc00:ff:ff:dead::/64 +# {peer6/96} = fc00:ff:ff:dead:beef:a1::/96 +# {peer6} = fc00:ff:ff:dead:beef:a1:b2:c3/128 +# {peer} = 10.0.0.20/32, fc00:ff:ff:dead:beef:a1:b2:c3/128 +# +# a subnet size of '-', e.g. {peer/-}, will take the subnet sizes from the +# network configuration. +# +# by default, this returns network addresses with host bits removed. +# passing interface=True will maintain the host bits. +def ipspec_to_ips(network, ipspec, peer, interface=False): + if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)): + return [ipspec] + + version = m[1] + subnet = m[2] + + f = (ipaddress.ip_interface if interface else + lambda x: ipaddress.ip_network(x, False)) + + return [ + str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}')) + for x in peer_ip_interfaces(network, peer['id'], version)] + +def ipspecs_to_ips(network, ipspecs, peer, interface=False): + return [ip + for ipspec in ipspecs + for ip in ipspec_to_ips(net_name, ipspec, peer, interface=interface)] + +def expand_auto_peer(net_name, local_name): + clients = list(dict.fromkeys( + peer_name + for peer_name in config['peer'] + for peer_if_conf in peer_conf(net_name, peer_name)['if'].values() + for peer_if_peer in peer_if_conf['peers'] + if peer_if_peer.get('name') == local_name)) + + return [{'name': x, 'ips': ['{peer}']} for x in clients] + +def gc_if_wgquick_add_peer(buf, net_name, peerspec): + peer = peer_conf(net_name, peerspec['name']) + + buf.write( + '\n' + f'# {peer['name']}\n' + '[Peer]\n' + f'PublicKey = {peer['pubkey']}\n') + for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): + buf.write(f'AllowedIPs = {ip}\n') + if (host := peer.get('host')): + port = peer.get('port', 51820) + buf.write(f'Endpoint = {host}:{port}\n') + +def gc_if_wgquick(local, net_name, if_conf): + buf = io.StringIO() + privkey = config['privkey'].get(net_name, 'FIXME') + + buf.write( + '[Interface]\n' + f'PrivateKey = {privkey}\n') + for addr in ipspecs_to_ips( + net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): + buf.write(f'Address = {addr}\n') + if (port := if_conf.get('port')) or (port := local.get('port')): + if port != 'auto': + buf.write(f'ListenPort = {port}\n') + if (fwmark := local.get('fwmark')): + buf.write(f'FwMark = {fwmark}\n') + + for peerspec in if_conf['peers']: + if peerspec.get('auto'): + for auto_spec in expand_auto_peer(net_name, local['name']): + gc_if_wgquick_add_peer(buf, net_name, auto_spec) + else: + gc_if_wgquick_add_peer(buf, net_name, peerspec) + + return buf + +def gc_if_systemd_network(local, net_name, netif_name): + buf = io.StringIO() + + buf.write( + '[Match]\n' + f'Name={netif_name}\n' + '\n' + '[Network]\n' + 'IPMasquerade=both\n') + for addr in ipspecs_to_ips( + net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): + buf.write(f'Address={addr}\n') + + return buf + +def gc_if_systemd_netdev_add_peer(buf, net_name, peerspec): + peer = peer_conf(net_name, peerspec['name']) + + buf.write( + '\n' + f'# {peer['name']}\n' + '[WireGuardPeer]\n' + f'PublicKey={peer['pubkey']}\n') + for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): + buf.write(f'AllowedIPs={ip}\n') + if (host := peer.get('host')): + port = peer.get('port', 51820) + buf.write(f'Endpoint={host}:{port}\n') + +def gc_if_systemd_netdev(local, net_name, if_conf, netif_name): + buf = io.StringIO() + privkey = config['privkey'].get(net_name, 'FIXME') + + buf.write( + '[NetDev]\n' + f'Name={netif_name}\n' + 'Kind=wireguard\n' + f'Description=WireGuard tunnel {netif_name}\n' + '\n' + '[WireGuard]\n' + f'PrivateKey={privkey}\n') + if (port := if_conf.get('port')) or (port := local.get('port')): + if port != 'auto': + buf.write(f'ListenPort={port}\n') + if (fwmark := local.get('fwmark')): + buf.write(f'FirewallMark={fwmark}\n') + + for peerspec in if_conf['peers']: + if peerspec.get('auto'): + for auto_spec in expand_auto_peer(net_name, local['name']): + gc_if_systemd_netdev_add_peer(buf, net_name, auto_spec) + else: + gc_if_systemd_netdev_add_peer(buf, net_name, peerspec) + + return buf + +def buf_to_file(buf, path, mode=None): + print(f'Creating file {path}') + + if mode: + path = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=mode) + with open(path, 'w') as f: + buf.seek(0) + shutil.copyfileobj(buf, f) + +def create_if_files(local, net_name, if_name, if_conf): + netif_name = f'{local.get('prefix', '')}{net_name}' + if if_name: + netif_name += f'-{if_name}' + + if if_conf.get('type') == 'systemd': + buf_to_file( + gc_if_systemd_netdev(local, net_name, if_conf, netif_name), + f'out/{netif_name}.netdev', + mode=0o640) + buf_to_file( + gc_if_systemd_network(local, net_name, netif_name), + f'out/{netif_name}.network') + else: + buf_to_file( + gc_if_wgquick(local, net_name, if_conf), + f'out/{netif_name}.conf') + +def peer_conf(net_name, peer_name): + if not (peer := config['peer'].get(peer_name)): + return + + if (nets := peer.get('net')) and (net := nets.get(net_name)): + return net + + return peer['default'] + +def process_config_networks(raw_config): + return { + k: [ipaddress.ip_network(subnet) for subnet in v['subnets']] + for k, v in raw_config['net'].items()} + +def process_config_peer_net(raw_config, peer_conf): + peer_conf.setdefault('if', {}) + if (ifsets := peer_conf.get('ifsets')): + for ifset_name in ifsets: + for if_name, if_conf in ( + raw_config['ifset'][ifset_name]['if'].items()): + peer_conf['if'].setdefault(if_name, copy.deepcopy(if_conf)) + + # remove self from if peers + ifs = peer_conf['if'] + for if_name, if_conf in list(ifs.items()): + filt_peers = [ + x for x in if_conf['peers'] if x.get('name') != peer_conf['name']] + if filt_peers: + if_conf['peers'] = filt_peers + else: + del ifs[if_name] + + return peer_conf + +def process_config_peers(raw_config): + peers = {} + for peer_name, peer_conf in copy.deepcopy(raw_config['peer']).items(): + peer_conf['name'] = peer_name + peer_conf_net = peer_conf.pop('net', {}) + + nets = { + net_name: process_config_peer_net( + raw_config, + deep_merge(copy.deepcopy(peer_conf), net_conf)) + for net_name, net_conf in peer_conf_net.items()} + # do this last because it mutates peer_conf + default = process_config_peer_net(raw_config, peer_conf) + + peers[peer_name] = { 'default': default, 'net': nets } + + return peers + +def process_config(raw_config): + return { + 'net': process_config_networks(raw_config), + 'peer': process_config_peers(raw_config), + } + +def get_privkeys(): + privkeys = {} + for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'): + with open(path, 'r') as f: + privkeys[path.stem] = f.read().rstrip() + return privkeys + +conf_file = sys.argv[1] +local_name = sys.argv[2] + +with open(conf_file, 'rb') as f: + raw_config = tomllib.load(f) +config = process_config(raw_config) +config['privkey'] = get_privkeys() + +if not (local_peer := config['peer'].get(local_name)): + sys.exit(f"peer '{local_name}' not configured") + +for net_name in local_peer['net']: + local = peer_conf(net_name, local_name) + for if_name, if_conf in local['if'].items(): + # file=false allows associating peers without creating a config file, + # useful when an auto-peer interface is being separately created + if if_conf.get('file', True): + create_if_files(local, net_name, if_name, if_conf) |
