summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Vazgenovich Shakaryan <dvshakaryan@gmail.com>2025-12-05 20:29:28 -0800
committerDavid Vazgenovich Shakaryan <dvshakaryan@gmail.com>2025-12-05 20:29:28 -0800
commitdc0b6a8ae98f8d830bc2a3ff385e3ff256a0a7f9 (patch)
treebaba3c11ff8a10d291a31041403ebfb55059a0ed
downloadwg-genconf-master.tar.gz
wg-genconf-master.tar.xz
initial importHEADmaster
-rwxr-xr-xwg.py278
1 files changed, 278 insertions, 0 deletions
diff --git a/wg.py b/wg.py
new file mode 100755
index 0000000..24c2b60
--- /dev/null
+++ b/wg.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python
+#
+# Copyright 2025 David Vazgenovich Shakaryan
+
+import copy
+import io
+import ipaddress
+import pathlib
+import os
+import re
+import shutil
+import sys
+import tomllib
+
+def deep_merge(d, src):
+ for k, v in src.items():
+ if isinstance(v, dict) and (dv := d.get(k)):
+ deep_merge(dv, v)
+ else:
+ d[k] = v
+ return d
+
+def peer_ip_interfaces(network, id_, version=None):
+ return [
+ ipaddress.ip_interface(
+ f'{x[int(id_) if x.version == 4 else int(str(id_), 16)]}'
+ f'/{x.prefixlen}')
+ for x in config['net'][network]
+ if not version or x.version == int(version)]
+
+# given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3
+# {peer4/24} = 10.0.0.0/24
+# {peer4/28} = 10.0.0.16/28
+# {peer4} = 10.0.0.20/32
+# {peer6/64} = fc00:ff:ff:dead::/64
+# {peer6/96} = fc00:ff:ff:dead:beef:a1::/96
+# {peer6} = fc00:ff:ff:dead:beef:a1:b2:c3/128
+# {peer} = 10.0.0.20/32, fc00:ff:ff:dead:beef:a1:b2:c3/128
+#
+# a subnet size of '-', e.g. {peer/-}, will take the subnet sizes from the
+# network configuration.
+#
+# by default, this returns network addresses with host bits removed.
+# passing interface=True will maintain the host bits.
+def ipspec_to_ips(network, ipspec, peer, interface=False):
+ if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)):
+ return [ipspec]
+
+ version = m[1]
+ subnet = m[2]
+
+ f = (ipaddress.ip_interface if interface else
+ lambda x: ipaddress.ip_network(x, False))
+
+ return [
+ str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}'))
+ for x in peer_ip_interfaces(network, peer['id'], version)]
+
+def ipspecs_to_ips(network, ipspecs, peer, interface=False):
+ return [ip
+ for ipspec in ipspecs
+ for ip in ipspec_to_ips(net_name, ipspec, peer, interface=interface)]
+
+def expand_auto_peer(net_name, local_name):
+ clients = list(dict.fromkeys(
+ peer_name
+ for peer_name in config['peer']
+ for peer_if_conf in peer_conf(net_name, peer_name)['if'].values()
+ for peer_if_peer in peer_if_conf['peers']
+ if peer_if_peer.get('name') == local_name))
+
+ return [{'name': x, 'ips': ['{peer}']} for x in clients]
+
+def gc_if_wgquick_add_peer(buf, net_name, peerspec):
+ peer = peer_conf(net_name, peerspec['name'])
+
+ buf.write(
+ '\n'
+ f'# {peer['name']}\n'
+ '[Peer]\n'
+ f'PublicKey = {peer['pubkey']}\n')
+ for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer):
+ buf.write(f'AllowedIPs = {ip}\n')
+ if (host := peer.get('host')):
+ port = peer.get('port', 51820)
+ buf.write(f'Endpoint = {host}:{port}\n')
+
+def gc_if_wgquick(local, net_name, if_conf):
+ buf = io.StringIO()
+ privkey = config['privkey'].get(net_name, 'FIXME')
+
+ buf.write(
+ '[Interface]\n'
+ f'PrivateKey = {privkey}\n')
+ for addr in ipspecs_to_ips(
+ net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True):
+ buf.write(f'Address = {addr}\n')
+ if (port := if_conf.get('port')) or (port := local.get('port')):
+ if port != 'auto':
+ buf.write(f'ListenPort = {port}\n')
+ if (fwmark := local.get('fwmark')):
+ buf.write(f'FwMark = {fwmark}\n')
+
+ for peerspec in if_conf['peers']:
+ if peerspec.get('auto'):
+ for auto_spec in expand_auto_peer(net_name, local['name']):
+ gc_if_wgquick_add_peer(buf, net_name, auto_spec)
+ else:
+ gc_if_wgquick_add_peer(buf, net_name, peerspec)
+
+ return buf
+
+def gc_if_systemd_network(local, net_name, netif_name):
+ buf = io.StringIO()
+
+ buf.write(
+ '[Match]\n'
+ f'Name={netif_name}\n'
+ '\n'
+ '[Network]\n'
+ 'IPMasquerade=both\n')
+ for addr in ipspecs_to_ips(
+ net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True):
+ buf.write(f'Address={addr}\n')
+
+ return buf
+
+def gc_if_systemd_netdev_add_peer(buf, net_name, peerspec):
+ peer = peer_conf(net_name, peerspec['name'])
+
+ buf.write(
+ '\n'
+ f'# {peer['name']}\n'
+ '[WireGuardPeer]\n'
+ f'PublicKey={peer['pubkey']}\n')
+ for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer):
+ buf.write(f'AllowedIPs={ip}\n')
+ if (host := peer.get('host')):
+ port = peer.get('port', 51820)
+ buf.write(f'Endpoint={host}:{port}\n')
+
+def gc_if_systemd_netdev(local, net_name, if_conf, netif_name):
+ buf = io.StringIO()
+ privkey = config['privkey'].get(net_name, 'FIXME')
+
+ buf.write(
+ '[NetDev]\n'
+ f'Name={netif_name}\n'
+ 'Kind=wireguard\n'
+ f'Description=WireGuard tunnel {netif_name}\n'
+ '\n'
+ '[WireGuard]\n'
+ f'PrivateKey={privkey}\n')
+ if (port := if_conf.get('port')) or (port := local.get('port')):
+ if port != 'auto':
+ buf.write(f'ListenPort={port}\n')
+ if (fwmark := local.get('fwmark')):
+ buf.write(f'FirewallMark={fwmark}\n')
+
+ for peerspec in if_conf['peers']:
+ if peerspec.get('auto'):
+ for auto_spec in expand_auto_peer(net_name, local['name']):
+ gc_if_systemd_netdev_add_peer(buf, net_name, auto_spec)
+ else:
+ gc_if_systemd_netdev_add_peer(buf, net_name, peerspec)
+
+ return buf
+
+def buf_to_file(buf, path, mode=None):
+ print(f'Creating file {path}')
+
+ if mode:
+ path = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=mode)
+ with open(path, 'w') as f:
+ buf.seek(0)
+ shutil.copyfileobj(buf, f)
+
+def create_if_files(local, net_name, if_name, if_conf):
+ netif_name = f'{local.get('prefix', '')}{net_name}'
+ if if_name:
+ netif_name += f'-{if_name}'
+
+ if if_conf.get('type') == 'systemd':
+ buf_to_file(
+ gc_if_systemd_netdev(local, net_name, if_conf, netif_name),
+ f'out/{netif_name}.netdev',
+ mode=0o640)
+ buf_to_file(
+ gc_if_systemd_network(local, net_name, netif_name),
+ f'out/{netif_name}.network')
+ else:
+ buf_to_file(
+ gc_if_wgquick(local, net_name, if_conf),
+ f'out/{netif_name}.conf')
+
+def peer_conf(net_name, peer_name):
+ if not (peer := config['peer'].get(peer_name)):
+ return
+
+ if (nets := peer.get('net')) and (net := nets.get(net_name)):
+ return net
+
+ return peer['default']
+
+def process_config_networks(raw_config):
+ return {
+ k: [ipaddress.ip_network(subnet) for subnet in v['subnets']]
+ for k, v in raw_config['net'].items()}
+
+def process_config_peer_net(raw_config, peer_conf):
+ peer_conf.setdefault('if', {})
+ if (ifsets := peer_conf.get('ifsets')):
+ for ifset_name in ifsets:
+ for if_name, if_conf in (
+ raw_config['ifset'][ifset_name]['if'].items()):
+ peer_conf['if'].setdefault(if_name, copy.deepcopy(if_conf))
+
+ # remove self from if peers
+ ifs = peer_conf['if']
+ for if_name, if_conf in list(ifs.items()):
+ filt_peers = [
+ x for x in if_conf['peers'] if x.get('name') != peer_conf['name']]
+ if filt_peers:
+ if_conf['peers'] = filt_peers
+ else:
+ del ifs[if_name]
+
+ return peer_conf
+
+def process_config_peers(raw_config):
+ peers = {}
+ for peer_name, peer_conf in copy.deepcopy(raw_config['peer']).items():
+ peer_conf['name'] = peer_name
+ peer_conf_net = peer_conf.pop('net', {})
+
+ nets = {
+ net_name: process_config_peer_net(
+ raw_config,
+ deep_merge(copy.deepcopy(peer_conf), net_conf))
+ for net_name, net_conf in peer_conf_net.items()}
+ # do this last because it mutates peer_conf
+ default = process_config_peer_net(raw_config, peer_conf)
+
+ peers[peer_name] = { 'default': default, 'net': nets }
+
+ return peers
+
+def process_config(raw_config):
+ return {
+ 'net': process_config_networks(raw_config),
+ 'peer': process_config_peers(raw_config),
+ }
+
+def get_privkeys():
+ privkeys = {}
+ for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'):
+ with open(path, 'r') as f:
+ privkeys[path.stem] = f.read().rstrip()
+ return privkeys
+
+conf_file = sys.argv[1]
+local_name = sys.argv[2]
+
+with open(conf_file, 'rb') as f:
+ raw_config = tomllib.load(f)
+config = process_config(raw_config)
+config['privkey'] = get_privkeys()
+
+if not (local_peer := config['peer'].get(local_name)):
+ sys.exit(f"peer '{local_name}' not configured")
+
+for net_name in local_peer['net']:
+ local = peer_conf(net_name, local_name)
+ for if_name, if_conf in local['if'].items():
+ # file=false allows associating peers without creating a config file,
+ # useful when an auto-peer interface is being separately created
+ if if_conf.get('file', True):
+ create_if_files(local, net_name, if_name, if_conf)