dotfiles

personal configuration files and scripts
git clone https://tongong.net/git/dotfiles.git
Log | Files | Refs | README

dmenu-network (31029B)


      1 #!/usr/bin/env python3
      2 # encoding:utf8
      3 """NetworkManager command line dmenu script.
      4 
      5 To add new connections or enable/disable networking requires policykit
      6 permissions setup per:
      7 https://wiki.archlinux.org/index.php/NetworkManager#Set_up_PolicyKit_permissions
      8 
      9 OR running the script as root
     10 
     11 Add dmenu formatting options and default terminal if desired to
     12 ~/.config/networkmanager-dmenu/config.ini
     13 
     14 """
     15 import pathlib
     16 import struct
     17 import configparser
     18 import itertools
     19 import locale
     20 import os
     21 from os.path import expanduser
     22 import shlex
     23 import sys
     24 import uuid
     25 from subprocess import Popen, PIPE
     26 
     27 import gi
     28 gi.require_version('NM', '1.0')
     29 from gi.repository import GLib, NM  # pylint: disable=wrong-import-position
     30 
     31 ENV = os.environ.copy()
     32 ENV['LC_ALL'] = 'C'
     33 ENC = locale.getpreferredencoding()
     34 
     35 CLIENT = NM.Client.new(None)
     36 LOOP = GLib.MainLoop()
     37 CONNS = CLIENT.get_connections()
     38 
     39 CONF = configparser.ConfigParser()
     40 CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini"))
     41 
     42 def dmenu_cmd(num_lines, prompt="networks:", active_lines=None):  # pylint: disable=too-many-branches
     43     """Parse config.ini if it exists and add options to the dmenu command
     44 
     45     Args: args - num_lines: number of lines to display
     46                  prompt: prompt to show
     47     Returns: command invocation (as a list of strings) for
     48                 dmenu -l <num_lines> -p <prompt> -i ...
     49 
     50     """
     51     dmenu_command = "dmenu"
     52     if not CONF.sections():
     53         res = [dmenu_command, "-i", "-l", str(num_lines), "-p", str(prompt)]
     54         res.extend(sys.argv[1:])
     55         return res
     56     if CONF.has_section('dmenu'):
     57         args = CONF.items('dmenu')
     58         args_dict = dict(args)
     59         dmenu_args = []
     60         if "dmenu_command" in args_dict:
     61             command = shlex.split(args_dict["dmenu_command"])
     62             dmenu_command = command[0]
     63             dmenu_args = command[1:]
     64             del args_dict["dmenu_command"]
     65         if "p" in args_dict and prompt == "networks:":
     66             prompt = args_dict["p"]
     67             del args_dict["p"]
     68         elif "p" in args_dict:
     69             del args_dict["p"]
     70         if "rofi" in dmenu_command:
     71             lines = "-i -dmenu -lines"
     72             # rofi doesn't support 0 length line, it requires at least -lines=1
     73             # see https://github.com/DaveDavenport/rofi/issues/252
     74             num_lines = num_lines or 1
     75         else:
     76             lines = "-i -l"
     77         if "l" in args_dict:
     78             # rofi doesn't support 0 length line, it requires at least -lines=1
     79             # see https://github.com/DaveDavenport/rofi/issues/252
     80             if "rofi" in dmenu_command:
     81                 args_dict['l'] = min(num_lines, int(args_dict['l'])) or 1
     82             lines = "{} {}".format(lines, args_dict['l'])
     83             del args_dict['l']
     84         else:
     85             lines = "{} {}".format(lines, num_lines)
     86         if "pinentry" in args_dict:
     87             del args_dict["pinentry"]
     88         if "compact" in args_dict:
     89             del args_dict["compact"]
     90         if "wifi_chars" in args_dict:
     91             del args_dict["wifi_chars"]
     92         rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
     93         if CONF.has_option('dmenu', 'rofi_highlight'):
     94             del args_dict["rofi_highlight"]
     95         if rofi_highlight is True and "rofi" in dmenu_command:
     96             if active_lines:
     97                 dmenu_args.extend(["-a", ",".join([str(num)
     98                                                    for num in active_lines])])
     99     if prompt == "Passphrase":
    100         if CONF.has_section('dmenu_passphrase'):
    101             args = CONF.items('dmenu_passphrase')
    102             args_dict.update(args)
    103         rofi_obscure = CONF.getboolean('dmenu_passphrase', 'rofi_obscure', fallback=True)
    104         if CONF.has_option('dmenu_passphrase', 'rofi_obscure'):
    105             del args_dict["rofi_obscure"]
    106         if rofi_obscure is True and "rofi" in dmenu_command:
    107             dmenu_args.extend(["-password"])
    108         dmenu_password = CONF.getboolean('dmenu_passphrase', 'dmenu_password', fallback=False)
    109         if CONF.has_option('dmenu_passphrase', 'dmenu_password'):
    110             del args_dict["dmenu_password"]
    111         if dmenu_password is True:
    112             dmenu_args.extend(["-P"])
    113     extras = (["-" + str(k), str(v)] for (k, v) in args_dict.items())
    114     res = [dmenu_command, "-p", str(prompt)]
    115     res.extend(dmenu_args)
    116     res += list(itertools.chain.from_iterable(extras))
    117     res[1:1] = lines.split()
    118     res = list(filter(None, res))  # Remove empty list elements
    119     res.extend(sys.argv[1:])
    120     return res
    121 
    122 
    123 def choose_adapter(client):
    124     """If there is more than one wifi adapter installed, ask which one to use
    125 
    126     """
    127     devices = client.get_devices()
    128     devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI]
    129     if not devices:  # pylint: disable=no-else-return
    130         return None
    131     elif len(devices) == 1:
    132         return devices[0]
    133     device_names = "\n".join([d.get_iface() for d in devices]).encode(ENC)
    134     sel = Popen(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"),
    135                 stdin=PIPE,
    136                 stdout=PIPE,
    137                 env=ENV).communicate(input=device_names)[0].decode(ENC)
    138     if not sel.strip():
    139         sys.exit()
    140     devices = [i for i in devices if i.get_iface() == sel.strip()]
    141     assert len(devices) == 1
    142     return devices[0]
    143 
    144 
    145 def is_modemmanager_installed():
    146     """Check if ModemManager is installed"""
    147     with open(os.devnull) as devnull:
    148         try:
    149             Popen(["ModemManager"], stdout=devnull, stderr=devnull).communicate()
    150         except OSError:
    151             return False
    152         return True
    153 
    154 
    155 def bluetooth_get_enabled():
    156     """Check if bluetooth is enabled via rfkill.
    157 
    158     Returns None if no bluetooth device was found.
    159     """
    160     # See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill
    161     for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'):
    162         if (path / 'type').read_text().strip() == 'bluetooth':
    163             return (path / 'soft').read_text().strip() == '0'
    164     return None
    165 
    166 
    167 def create_other_actions(client):
    168     """Return list of other actions that can be taken
    169 
    170     """
    171     networking_enabled = client.networking_get_enabled()
    172     networking_action = "Disable" if networking_enabled else "Enable"
    173 
    174     wifi_enabled = client.wireless_get_enabled()
    175     wifi_action = "Disable" if wifi_enabled else "Enable"
    176 
    177     bluetooth_enabled = bluetooth_get_enabled()
    178     bluetooth_action = "Disable" if bluetooth_enabled else "Enable"
    179 
    180     actions = [Action("{} Wifi".format(wifi_action), toggle_wifi,
    181                       not wifi_enabled),
    182                Action("{} Networking".format(networking_action),
    183                       toggle_networking, not networking_enabled)]
    184     if bluetooth_enabled is not None:
    185         actions.append(Action("{} Bluetooth".format(bluetooth_action),
    186                               toggle_bluetooth, not bluetooth_enabled))
    187     actions += [Action("Launch Connection Manager", launch_connection_editor),
    188                 Action("Delete a Connection", delete_connection)]
    189     if wifi_enabled:
    190         actions.append(Action("Rescan Wifi Networks", rescan_wifi))
    191     return actions
    192 
    193 
    194 def rescan_wifi():
    195     """
    196     Rescan Wifi Access Points
    197     """
    198     for dev in CLIENT.get_devices():
    199         if gi.repository.NM.DeviceWifi == type(dev):
    200             try:
    201                 dev.request_scan_async(None, rescan_cb, None)
    202                 LOOP.run()
    203             except gi.repository.GLib.Error as err:
    204                 # Too frequent rescan error
    205                 notify("Wifi rescan failed", urgency="critical")
    206                 if not err.code == 6:  # pylint: disable=no-member
    207                     raise err
    208 
    209 
    210 def rescan_cb(dev, res, data):
    211     """Callback for rescan_wifi. Just for notifications
    212 
    213     """
    214     if dev.request_scan_finish(res) is True:
    215         notify("Wifi scan complete")
    216     else:
    217         notify("Wifi scan failed", urgency="critical")
    218     LOOP.quit()
    219 
    220 
    221 def ssid_to_utf8(nm_ap):
    222     """ Convert binary ssid to utf-8 """
    223     ssid = nm_ap.get_ssid()
    224     if not ssid:
    225         return ""
    226     ret = NM.utils_ssid_to_utf8(ssid.get_data())
    227     return ret
    228 
    229 
    230 def prompt_saved(saved_cons):
    231     """Prompt for a saved connection."""
    232     actions = create_saved_actions(saved_cons)
    233     sel = get_selection(actions)
    234     sel()
    235 
    236 
    237 def ap_security(nm_ap):
    238     """Parse the security flags to return a string with 'WPA2', etc. """
    239     flags = nm_ap.get_flags()
    240     wpa_flags = nm_ap.get_wpa_flags()
    241     rsn_flags = nm_ap.get_rsn_flags()
    242     sec_str = ""
    243     if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and
    244             (wpa_flags == 0) and (rsn_flags == 0)):
    245         sec_str += " WEP"
    246     if wpa_flags != 0:
    247         sec_str += " WPA1"
    248     if rsn_flags != 0:
    249         sec_str += " WPA2"
    250     if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or
    251             (rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)):
    252         sec_str += " 802.1X"
    253 
    254     # If there is no security use "--"
    255     if sec_str == "":
    256         sec_str = "--"
    257     return sec_str.lstrip()
    258 
    259 
    260 class Action():  # pylint: disable=too-few-public-methods
    261     """Helper class to execute functions from a string variable"""
    262     def __init__(self,
    263                  name,
    264                  func,
    265                  args=None,
    266                  active=False):
    267         self.name = name
    268         self.func = func
    269         self.is_active = active
    270         if args is None:
    271             self.args = None
    272         elif isinstance(args, list):
    273             self.args = args
    274         else:
    275             self.args = [args]
    276 
    277     def __str__(self):
    278         return self.name
    279 
    280     def __call__(self):
    281         if self.args is None:
    282             self.func()
    283         else:
    284             self.func(*self.args)
    285 
    286 
    287 def process_ap(nm_ap, is_active, adapter):
    288     """Activate/Deactivate a connection and get password if required"""
    289     if is_active:
    290         CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap)
    291     else:
    292         conns_cur = [i for i in CONNS if
    293                      i.get_setting_wireless() is not None and
    294                      i.get_setting_wireless().get_mac_address() ==
    295                      adapter.get_permanent_hw_address()]
    296         con = nm_ap.filter_connections(conns_cur)
    297         if len(con) > 1:
    298             raise ValueError("There are multiple connections possible")
    299 
    300         if len(con) == 1:
    301             CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(),
    302                                              None, activate_cb, nm_ap)
    303         else:
    304             if ap_security(nm_ap) != "--":
    305                 password = get_passphrase()
    306             else:
    307                 password = ""
    308             set_new_connection(nm_ap, password, adapter)
    309     LOOP.run()
    310 
    311 
    312 def activate_cb(dev, res, data):
    313     """Notification if activate connection completed successfully
    314 
    315     """
    316     try:
    317         conn = dev.activate_connection_finish(res)
    318     except GLib.Error:
    319         conn = None
    320     if conn is not None:
    321         notify("Activated {}".format(conn.get_id()))
    322     else:
    323         notify("Problem activating {}".format(data.get_id()),
    324                urgency="critical")
    325     LOOP.quit()
    326 
    327 
    328 def deactivate_cb(dev, res, data):
    329     """Notification if deactivate connection completed successfully
    330 
    331     """
    332     if dev.deactivate_connection_finish(res) is True:
    333         notify("Deactivated {}".format(data.get_id()))
    334     else:
    335         notify("Problem deactivating {}".format(data.get_id()),
    336                urgency="critical")
    337     LOOP.quit()
    338 
    339 
    340 def process_vpngsm(con, activate):
    341     """Activate/deactive VPN or GSM connections"""
    342     if activate:
    343         CLIENT.activate_connection_async(con, None, None,
    344                                          None, activate_cb, con)
    345     else:
    346         CLIENT.deactivate_connection_async(con, None, deactivate_cb, con)
    347     LOOP.run()
    348 
    349 
    350 def create_ap_actions(aps, active_ap, active_connection, adapter):  # pylint: disable=too-many-locals
    351     """For each AP in a list, create the string and its attached function
    352     (activate/deactivate)
    353 
    354     """
    355     active_ap_bssid = active_ap.get_bssid() if active_ap is not None else ""
    356 
    357     names = [ssid_to_utf8(ap) for ap in aps]
    358     max_len_name = max([len(name) for name in names]) if names else 0
    359     secs = [ap_security(ap) for ap in aps]
    360     max_len_sec = max([len(sec) for sec in secs]) if secs else 0
    361 
    362     ap_actions = []
    363 
    364     for nm_ap, name, sec in zip(aps, names, secs):
    365         bars = NM.utils_wifi_strength_bars(nm_ap.get_strength())
    366         wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False)
    367         if wifi_chars:
    368             bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*'])
    369         is_active = nm_ap.get_bssid() == active_ap_bssid
    370         compact = CONF.getboolean("dmenu", "compact", fallback=False)
    371         if compact:
    372             action_name = u"{}  {}  {}".format(name, sec, bars)
    373         else:
    374             action_name = u"{:<{}s}  {:<{}s}  {}".format(name, max_len_name, sec,
    375                                                          max_len_sec, bars)
    376         if is_active:
    377             ap_actions.append(Action(action_name, process_ap,
    378                                      [active_connection, True, adapter],
    379                                      active=True))
    380         else:
    381             ap_actions.append(Action(action_name, process_ap,
    382                                      [nm_ap, False, adapter]))
    383     return ap_actions
    384 
    385 
    386 def create_vpn_actions(vpns, active):
    387     """Create the list of strings to display with associated function
    388     (activate/deactivate) for VPN connections.
    389 
    390     """
    391     active_vpns = [i for i in active if i.get_vpn()]
    392     return _create_vpngsm_actions(vpns, active_vpns, "VPN")
    393 
    394 
    395 def create_wireguard_actions(wgs, active):
    396     """Create the list of strings to display with associated function
    397     (activate/deactivate) for Wireguard connections.
    398 
    399     """
    400     active_wgs = [i for i in active if i.get_connection_type() == "wireguard"]
    401     return _create_vpngsm_actions(wgs, active_wgs, "Wireguard")
    402 
    403 
    404 def create_eth_actions(eths, active):
    405     """Create the list of strings to display with associated function
    406     (activate/deactivate) for Ethernet connections.
    407 
    408     """
    409     active_eths = [i for i in active if 'ethernet' in i.get_connection_type()]
    410     return _create_vpngsm_actions(eths, active_eths, "Eth")
    411 
    412 
    413 def create_gsm_actions(gsms, active):
    414     """Create the list of strings to display with associated function
    415     (activate/deactivate) GSM connections."""
    416     active_gsms = [i for i in active if
    417                    i.get_connection() is not None and
    418                    i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)]
    419     return _create_vpngsm_actions(gsms, active_gsms, "GSM")
    420 
    421 
    422 def create_blue_actions(blues, active):
    423     """Create the list of strings to display with associated function
    424     (activate/deactivate) Bluetooth connections."""
    425     active_blues = [i for i in active if
    426                     i.get_connection() is not None and
    427                     i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
    428     return _create_vpngsm_actions(blues, active_blues, "Bluetooth")
    429 
    430 
    431 def create_saved_actions(saved):
    432     """Create the list of strings to display with associated function
    433     (activate/deactivate) for VPN connections.
    434 
    435     """
    436     return _create_vpngsm_actions(saved, [], "SAVED")
    437 
    438 
    439 def _create_vpngsm_actions(cons, active_cons, label):
    440     active_con_ids = [a.get_id() for a in active_cons]
    441     actions = []
    442     for con in cons:
    443         is_active = con.get_id() in active_con_ids
    444         action_name = u"{}:{}".format(con.get_id(), label)
    445         if is_active:
    446             active_connection = [a for a in active_cons
    447                                  if a.get_id() == con.get_id()]
    448             if len(active_connection) != 1:
    449                 raise ValueError(u"Multiple active connections match"
    450                                  " the connection: {}".format(con.get_id()))
    451             active_connection = active_connection[0]
    452 
    453             actions.append(Action(action_name, process_vpngsm,
    454                                   [active_connection, False], active=True))
    455         else:
    456             actions.append(Action(action_name, process_vpngsm,
    457                                   [con, True]))
    458     return actions
    459 
    460 
    461 def create_wwan_actions(client):
    462     """Create WWWAN actions
    463 
    464     """
    465     wwan_enabled = client.wwan_get_enabled()
    466     wwan_action = "Disable" if wwan_enabled else "Enable"
    467     return [Action("{} WWAN".format(wwan_action), toggle_wwan, not wwan_enabled)]
    468 
    469 
    470 def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved):
    471     """Combine all given actions into a list of actions.
    472 
    473     Args: args - eths: list of Actions
    474                  aps: list of Actions
    475                  vpns: list of Actions
    476                  gsms: list of Actions
    477                  blues: list of Actions
    478                  wwan: list of Actions
    479                  others: list of Actions
    480     """
    481     compact = CONF.getboolean("dmenu", "compact", fallback=False)
    482     empty_action = [Action('', None)] if not compact else []
    483     all_actions = []
    484     all_actions += eths + empty_action if eths else []
    485     all_actions += aps + empty_action if aps else []
    486     all_actions += vpns + empty_action if vpns else []
    487     all_actions += wgs + empty_action if wgs else []
    488     all_actions += gsms + empty_action if (gsms and wwan) else []
    489     all_actions += blues + empty_action if blues else []
    490     all_actions += wwan + empty_action if wwan else []
    491     all_actions += others + empty_action if others else []
    492     all_actions += saved + empty_action if saved else []
    493     return all_actions
    494 
    495 
    496 def get_selection(all_actions):
    497     """Spawn dmenu for selection and execute the associated action."""
    498     rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
    499     inp = []
    500 
    501     if rofi_highlight is True:
    502         inp = [str(action) for action in all_actions]
    503     else:
    504         inp = [('== ' if action.is_active else '   ') + str(action)
    505                for action in all_actions]
    506     active_lines = [index for index, action in enumerate(all_actions)
    507                     if action.is_active]
    508 
    509     inp_bytes = "\n".join(inp).encode(ENC)
    510     command = dmenu_cmd(len(inp), active_lines=active_lines)
    511     sel = Popen(command, stdin=PIPE, stdout=PIPE,
    512                 env=ENV).communicate(input=inp_bytes)[0].decode(ENC)
    513 
    514     if not sel.rstrip():
    515         sys.exit()
    516 
    517     if rofi_highlight is False:
    518         action = [i for i in all_actions
    519                   if ((str(i).strip() == str(sel.strip())
    520                        and not i.is_active) or
    521                       ('== ' + str(i) == str(sel.rstrip('\n'))
    522                        and i.is_active))]
    523     else:
    524         action = [i for i in all_actions if str(i).strip() == sel.strip()]
    525     assert len(action) == 1, \
    526         u"Selection was ambiguous: '{}'".format(str(sel.strip()))
    527     return action[0]
    528 
    529 
    530 def toggle_networking(enable):
    531     """Enable/disable networking
    532 
    533     Args: enable - boolean
    534 
    535     """
    536     toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable))
    537     try:
    538         CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle,
    539                          None, -1, None, None, None)
    540     except AttributeError:
    541         # Workaround for older versions of python-gobject
    542         CLIENT.networking_set_enabled(enable)
    543     notify("Networking {}".format("enabled" if enable is True else "disabled"))
    544 
    545 
    546 def toggle_wifi(enable):
    547     """Enable/disable Wifi
    548 
    549     Args: enable - boolean
    550 
    551     """
    552     toggle = GLib.Variant.new_boolean(enable)
    553     try:
    554         CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle,
    555                                  -1, None, None, None)
    556     except AttributeError:
    557         # Workaround for older versions of python-gobject
    558         CLIENT.wireless_set_enabled(enable)
    559     notify("Wifi {}".format("enabled" if enable is True else "disabled"))
    560 
    561 
    562 def toggle_wwan(enable):
    563     """Enable/disable WWAN
    564 
    565     Args: enable - boolean
    566 
    567     """
    568     toggle = GLib.Variant.new_boolean(enable)
    569     try:
    570         CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle,
    571                                  -1, None, None, None)
    572     except AttributeError:
    573         # Workaround for older versions of python-gobject
    574         CLIENT.wwan_set_enabled(enable)
    575     notify("Wwan {}".format("enabled" if enable is True else "disabled"))
    576 
    577 
    578 def toggle_bluetooth(enable):
    579     """Enable/disable Bluetooth
    580 
    581     Args: enable - boolean
    582 
    583     References:
    584     https://github.com/blueman-project/blueman/blob/master/blueman/plugins/mechanism/RfKill.py
    585     https://www.kernel.org/doc/html/latest/driver-api/rfkill.html
    586     https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9
    587 
    588     """
    589     type_bluetooth = 2
    590     op_change_all = 3
    591     idx = 0
    592     soft_state = 0 if enable else 1
    593     hard_state = 0
    594 
    595     data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all,
    596                        soft_state, hard_state)
    597 
    598     try:
    599         with open('/dev/rfkill', 'r+b', buffering=0) as rff:
    600             rff.write(data)
    601     except PermissionError:
    602         notify("Lacking permission to write to /dev/rfkill.",
    603                "Maybe you need to add your user to the 'rfkill' group?",
    604                urgency="critical")
    605     else:
    606         notify("Bluetooth {}".format("enabled" if enable else "disabled"))
    607 
    608 
    609 def launch_connection_editor():
    610     """Launch nmtui or the gui nm-connection-editor
    611 
    612     """
    613     terminal = CONF.get("editor", "terminal", fallback="xterm")
    614     gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True)
    615     if gui_if_available is True:
    616         try:
    617             Popen(["gnome-control-center", "network"]).communicate()
    618         except OSError:
    619             try:
    620                 Popen(["nm-connection-editor"]).communicate()
    621             except OSError:
    622                 Popen([terminal, "-e", "nmtui"]).communicate()
    623     else:
    624         Popen([terminal, "-e", "nmtui"]).communicate()
    625 
    626 
    627 def get_passphrase():
    628     """Get a password
    629 
    630     Returns: string
    631 
    632     """
    633     pinentry = CONF.get("dmenu", "pinentry", fallback=None)
    634     if pinentry:
    635         pin = ""
    636         out = Popen(pinentry,
    637                     stdout=PIPE,
    638                     stdin=PIPE).communicate(input=b'setdesc Get network password\ngetpin\n')[0]
    639         if out:
    640             res = out.decode(ENC).split("\n")[2]
    641             if res.startswith("D "):
    642                 pin = res.split("D ")[1]
    643         return pin
    644     return Popen(dmenu_cmd(0, "Passphrase"),
    645                  stdin=PIPE, stdout=PIPE).communicate()[0].decode(ENC)
    646 
    647 
    648 def delete_connection():
    649     """Display list of NM connections and delete the selected one
    650 
    651     """
    652     conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS]
    653     conn_names = "\n".join([str(i) for i in conn_acts]).encode(ENC)
    654     sel = Popen(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"),
    655                 stdin=PIPE,
    656                 stdout=PIPE,
    657                 env=ENV).communicate(input=conn_names)[0].decode(ENC)
    658     if not sel.strip():
    659         sys.exit()
    660     action = [i for i in conn_acts if str(i) == sel.rstrip("\n")]
    661     assert len(action) == 1, u"Selection was ambiguous: {}".format(str(sel))
    662     action[0]()
    663     LOOP.run()
    664 
    665 
    666 def delete_cb(dev, res, data):
    667     """Notification if delete completed successfully
    668 
    669     """
    670     if dev.delete_finish(res) is True:
    671         notify("Deleted {}".format(dev.get_id()))
    672     else:
    673         notify("Problem deleting {}".format(dev.get_id()), urgency="critical")
    674     LOOP.quit()
    675 
    676 
    677 def set_new_connection(nm_ap, nm_pw, adapter):
    678     """Setup a new NetworkManager connection
    679 
    680     Args: ap - NM.AccessPoint
    681           pw - string
    682 
    683     """
    684     nm_pw = str(nm_pw).strip()
    685     profile = create_wifi_profile(nm_ap, nm_pw, adapter)
    686     CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(),
    687                                              None, verify_conn, profile)
    688     LOOP.run()
    689 
    690 
    691 def create_wifi_profile(nm_ap, password, adapter):
    692     # pylint: disable=C0301
    693     # From https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/gi/add_connection.py
    694     # and https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/dbus/add-wifi-psk-connection.py
    695     # pylint: enable=C0301
    696     """Create the NM profile given the AP and passphrase"""
    697     ap_sec = ap_security(nm_ap)
    698     profile = NM.SimpleConnection.new()
    699 
    700     s_con = NM.SettingConnection.new()
    701     s_con.set_property(NM.SETTING_CONNECTION_ID, ssid_to_utf8(nm_ap))
    702     s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4()))
    703     s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless")
    704     profile.add_setting(s_con)
    705 
    706     s_wifi = NM.SettingWireless.new()
    707     s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid())
    708     s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure')
    709     s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address())
    710     profile.add_setting(s_wifi)
    711 
    712     s_ip4 = NM.SettingIP4Config.new()
    713     s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
    714     profile.add_setting(s_ip4)
    715 
    716     s_ip6 = NM.SettingIP6Config.new()
    717     s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
    718     profile.add_setting(s_ip6)
    719 
    720     if ap_sec != "--":
    721         s_wifi_sec = NM.SettingWirelessSecurity.new()
    722         if "WPA" in ap_sec:
    723             s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
    724                                     "wpa-psk")
    725             s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG,
    726                                     "open")
    727             s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password)
    728         elif "WEP" in ap_sec:
    729             s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
    730                                     "None")
    731             s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE,
    732                                     NM.WepKeyType.PASSPHRASE)
    733             s_wifi_sec.set_wep_key(0, password)
    734         profile.add_setting(s_wifi_sec)
    735 
    736     return profile
    737 
    738 
    739 def verify_conn(client, result, data):
    740     """Callback function for add_and_activate_connection_async
    741 
    742     Check if connection completes successfully. Delete the connection if there
    743     is an error.
    744 
    745     """
    746     try:
    747         act_conn = client.add_and_activate_connection_finish(result)
    748         conn = act_conn.get_connection()
    749         if not all([conn.verify(),
    750                     conn.verify_secrets(),
    751                     data.verify(),
    752                     data.verify_secrets()]):
    753             raise GLib.Error
    754         notify("Added {}".format(conn.get_id()))
    755     except GLib.Error:  # pylint: disable=catching-non-exception
    756         try:
    757             notify("Connection to {} failed".format(conn.get_id()),
    758                    urgency="critical")
    759             conn.delete_async(None, None, None)
    760         except UnboundLocalError:
    761             pass
    762     finally:
    763         LOOP.quit()
    764 
    765 
    766 def create_ap_list(adapter, active_connections):
    767     """Generate list of access points. Remove duplicate APs , keeping strongest
    768     ones and the active AP
    769 
    770     Args: adapter
    771           active_connections - list of all active connections
    772     Returns: aps - list of access points
    773              active_ap - active AP
    774              active_ap_con - active Connection
    775              adapter
    776 
    777     """
    778     aps = []
    779     ap_names = []
    780     active_ap = adapter.get_active_access_point()
    781     aps_all = sorted(adapter.get_access_points(),
    782                      key=lambda a: a.get_strength(), reverse=True)
    783     conns_cur = [i for i in CONNS if
    784                  i.get_setting_wireless() is not None and
    785                  i.get_setting_wireless().get_mac_address() ==
    786                  adapter.get_permanent_hw_address()]
    787     try:
    788         ap_conns = active_ap.filter_connections(conns_cur)
    789         active_ap_name = ssid_to_utf8(active_ap)
    790         active_ap_con = [active_conn for active_conn in active_connections
    791                          if active_conn.get_connection() in ap_conns]
    792     except AttributeError:
    793         active_ap_name = None
    794         active_ap_con = []
    795     if len(active_ap_con) > 1:
    796         raise ValueError("Multiple connection profiles match"
    797                          " the wireless AP")
    798     active_ap_con = active_ap_con[0] if active_ap_con else None
    799     for nm_ap in aps_all:
    800         ap_name = ssid_to_utf8(nm_ap)
    801         if nm_ap != active_ap and ap_name == active_ap_name:
    802             # Skip adding AP if it's not active but same name as active AP
    803             continue
    804         if ap_name not in ap_names:
    805             ap_names.append(ap_name)
    806             aps.append(nm_ap)
    807     return aps, active_ap, active_ap_con, adapter
    808 
    809 
    810 def notify(message, details=None, urgency="low"):
    811     """Use notify-send if available for notifications
    812 
    813     """
    814     args = ["-u", urgency, message]
    815     if details is not None:
    816         args.append(details)
    817 
    818     try:
    819         Popen(["notify-send"] + args,
    820               stdout=PIPE, stderr=PIPE).communicate()
    821     except FileNotFoundError:
    822         pass
    823 
    824 
    825 def run():
    826     """Main script entrypoint"""
    827     active = CLIENT.get_active_connections()
    828     adapter = choose_adapter(CLIENT)
    829     if adapter:
    830         ap_actions = create_ap_actions(*create_ap_list(adapter, active))
    831     else:
    832         ap_actions = []
    833 
    834     vpns = [i for i in CONNS if i.is_type(NM.SETTING_VPN_SETTING_NAME)]
    835     try:
    836         wgs = [i for i in CONNS if i.is_type(NM.SETTING_WIREGUARD_SETTING_NAME)]
    837     except AttributeError:
    838         # Workaround for older versions of python-gobject with no wireguard support
    839         wgs = []
    840     eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)]
    841     blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
    842 
    843     vpn_actions = create_vpn_actions(vpns, active)
    844     wg_actions = create_wireguard_actions(wgs, active)
    845     eth_actions = create_eth_actions(eths, active)
    846     blue_actions = create_blue_actions(blues, active)
    847     other_actions = create_other_actions(CLIENT)
    848     wwan_installed = is_modemmanager_installed()
    849     if wwan_installed:
    850         gsms = [i for i in CONNS if i.is_type(NM.SETTING_GSM_SETTING_NAME)]
    851         gsm_actions = create_gsm_actions(gsms, active)
    852         wwan_actions = create_wwan_actions(CLIENT)
    853     else:
    854         gsm_actions = []
    855         wwan_actions = []
    856 
    857     list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False)
    858     saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues]
    859     if list_saved:
    860         saved_actions = create_saved_actions(saved_cons)
    861     else:
    862         saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])]
    863 
    864     actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions,
    865                               gsm_actions, blue_actions, wwan_actions,
    866                               other_actions, saved_actions)
    867     sel = get_selection(actions)
    868     sel()
    869 
    870 
    871 if __name__ == '__main__':
    872     run()
    873 
    874 # vim: set et ts=4 sw=4 :