dmenu-network (31029B)
1 #!/usr/bin/env python3 2 # encoding:utf8 3 """NetworkManager command line dmenu script. 4 5 To add new connections or enable/disable networking requires policykit 6 permissions setup per: 7 https://wiki.archlinux.org/index.php/NetworkManager#Set_up_PolicyKit_permissions 8 9 OR running the script as root 10 11 Add dmenu formatting options and default terminal if desired to 12 ~/.config/networkmanager-dmenu/config.ini 13 14 """ 15 import pathlib 16 import struct 17 import configparser 18 import itertools 19 import locale 20 import os 21 from os.path import expanduser 22 import shlex 23 import sys 24 import uuid 25 from subprocess import Popen, PIPE 26 27 import gi 28 gi.require_version('NM', '1.0') 29 from gi.repository import GLib, NM # pylint: disable=wrong-import-position 30 31 ENV = os.environ.copy() 32 ENV['LC_ALL'] = 'C' 33 ENC = locale.getpreferredencoding() 34 35 CLIENT = NM.Client.new(None) 36 LOOP = GLib.MainLoop() 37 CONNS = CLIENT.get_connections() 38 39 CONF = configparser.ConfigParser() 40 CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini")) 41 42 def dmenu_cmd(num_lines, prompt="networks:", active_lines=None): # pylint: disable=too-many-branches 43 """Parse config.ini if it exists and add options to the dmenu command 44 45 Args: args - num_lines: number of lines to display 46 prompt: prompt to show 47 Returns: command invocation (as a list of strings) for 48 dmenu -l <num_lines> -p <prompt> -i ... 49 50 """ 51 dmenu_command = "dmenu" 52 if not CONF.sections(): 53 res = [dmenu_command, "-i", "-l", str(num_lines), "-p", str(prompt)] 54 res.extend(sys.argv[1:]) 55 return res 56 if CONF.has_section('dmenu'): 57 args = CONF.items('dmenu') 58 args_dict = dict(args) 59 dmenu_args = [] 60 if "dmenu_command" in args_dict: 61 command = shlex.split(args_dict["dmenu_command"]) 62 dmenu_command = command[0] 63 dmenu_args = command[1:] 64 del args_dict["dmenu_command"] 65 if "p" in args_dict and prompt == "networks:": 66 prompt = args_dict["p"] 67 del args_dict["p"] 68 elif "p" in args_dict: 69 del args_dict["p"] 70 if "rofi" in dmenu_command: 71 lines = "-i -dmenu -lines" 72 # rofi doesn't support 0 length line, it requires at least -lines=1 73 # see https://github.com/DaveDavenport/rofi/issues/252 74 num_lines = num_lines or 1 75 else: 76 lines = "-i -l" 77 if "l" in args_dict: 78 # rofi doesn't support 0 length line, it requires at least -lines=1 79 # see https://github.com/DaveDavenport/rofi/issues/252 80 if "rofi" in dmenu_command: 81 args_dict['l'] = min(num_lines, int(args_dict['l'])) or 1 82 lines = "{} {}".format(lines, args_dict['l']) 83 del args_dict['l'] 84 else: 85 lines = "{} {}".format(lines, num_lines) 86 if "pinentry" in args_dict: 87 del args_dict["pinentry"] 88 if "compact" in args_dict: 89 del args_dict["compact"] 90 if "wifi_chars" in args_dict: 91 del args_dict["wifi_chars"] 92 rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) 93 if CONF.has_option('dmenu', 'rofi_highlight'): 94 del args_dict["rofi_highlight"] 95 if rofi_highlight is True and "rofi" in dmenu_command: 96 if active_lines: 97 dmenu_args.extend(["-a", ",".join([str(num) 98 for num in active_lines])]) 99 if prompt == "Passphrase": 100 if CONF.has_section('dmenu_passphrase'): 101 args = CONF.items('dmenu_passphrase') 102 args_dict.update(args) 103 rofi_obscure = CONF.getboolean('dmenu_passphrase', 'rofi_obscure', fallback=True) 104 if CONF.has_option('dmenu_passphrase', 'rofi_obscure'): 105 del args_dict["rofi_obscure"] 106 if rofi_obscure is True and "rofi" in dmenu_command: 107 dmenu_args.extend(["-password"]) 108 dmenu_password = CONF.getboolean('dmenu_passphrase', 'dmenu_password', fallback=False) 109 if CONF.has_option('dmenu_passphrase', 'dmenu_password'): 110 del args_dict["dmenu_password"] 111 if dmenu_password is True: 112 dmenu_args.extend(["-P"]) 113 extras = (["-" + str(k), str(v)] for (k, v) in args_dict.items()) 114 res = [dmenu_command, "-p", str(prompt)] 115 res.extend(dmenu_args) 116 res += list(itertools.chain.from_iterable(extras)) 117 res[1:1] = lines.split() 118 res = list(filter(None, res)) # Remove empty list elements 119 res.extend(sys.argv[1:]) 120 return res 121 122 123 def choose_adapter(client): 124 """If there is more than one wifi adapter installed, ask which one to use 125 126 """ 127 devices = client.get_devices() 128 devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI] 129 if not devices: # pylint: disable=no-else-return 130 return None 131 elif len(devices) == 1: 132 return devices[0] 133 device_names = "\n".join([d.get_iface() for d in devices]).encode(ENC) 134 sel = Popen(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"), 135 stdin=PIPE, 136 stdout=PIPE, 137 env=ENV).communicate(input=device_names)[0].decode(ENC) 138 if not sel.strip(): 139 sys.exit() 140 devices = [i for i in devices if i.get_iface() == sel.strip()] 141 assert len(devices) == 1 142 return devices[0] 143 144 145 def is_modemmanager_installed(): 146 """Check if ModemManager is installed""" 147 with open(os.devnull) as devnull: 148 try: 149 Popen(["ModemManager"], stdout=devnull, stderr=devnull).communicate() 150 except OSError: 151 return False 152 return True 153 154 155 def bluetooth_get_enabled(): 156 """Check if bluetooth is enabled via rfkill. 157 158 Returns None if no bluetooth device was found. 159 """ 160 # See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill 161 for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'): 162 if (path / 'type').read_text().strip() == 'bluetooth': 163 return (path / 'soft').read_text().strip() == '0' 164 return None 165 166 167 def create_other_actions(client): 168 """Return list of other actions that can be taken 169 170 """ 171 networking_enabled = client.networking_get_enabled() 172 networking_action = "Disable" if networking_enabled else "Enable" 173 174 wifi_enabled = client.wireless_get_enabled() 175 wifi_action = "Disable" if wifi_enabled else "Enable" 176 177 bluetooth_enabled = bluetooth_get_enabled() 178 bluetooth_action = "Disable" if bluetooth_enabled else "Enable" 179 180 actions = [Action("{} Wifi".format(wifi_action), toggle_wifi, 181 not wifi_enabled), 182 Action("{} Networking".format(networking_action), 183 toggle_networking, not networking_enabled)] 184 if bluetooth_enabled is not None: 185 actions.append(Action("{} Bluetooth".format(bluetooth_action), 186 toggle_bluetooth, not bluetooth_enabled)) 187 actions += [Action("Launch Connection Manager", launch_connection_editor), 188 Action("Delete a Connection", delete_connection)] 189 if wifi_enabled: 190 actions.append(Action("Rescan Wifi Networks", rescan_wifi)) 191 return actions 192 193 194 def rescan_wifi(): 195 """ 196 Rescan Wifi Access Points 197 """ 198 for dev in CLIENT.get_devices(): 199 if gi.repository.NM.DeviceWifi == type(dev): 200 try: 201 dev.request_scan_async(None, rescan_cb, None) 202 LOOP.run() 203 except gi.repository.GLib.Error as err: 204 # Too frequent rescan error 205 notify("Wifi rescan failed", urgency="critical") 206 if not err.code == 6: # pylint: disable=no-member 207 raise err 208 209 210 def rescan_cb(dev, res, data): 211 """Callback for rescan_wifi. Just for notifications 212 213 """ 214 if dev.request_scan_finish(res) is True: 215 notify("Wifi scan complete") 216 else: 217 notify("Wifi scan failed", urgency="critical") 218 LOOP.quit() 219 220 221 def ssid_to_utf8(nm_ap): 222 """ Convert binary ssid to utf-8 """ 223 ssid = nm_ap.get_ssid() 224 if not ssid: 225 return "" 226 ret = NM.utils_ssid_to_utf8(ssid.get_data()) 227 return ret 228 229 230 def prompt_saved(saved_cons): 231 """Prompt for a saved connection.""" 232 actions = create_saved_actions(saved_cons) 233 sel = get_selection(actions) 234 sel() 235 236 237 def ap_security(nm_ap): 238 """Parse the security flags to return a string with 'WPA2', etc. """ 239 flags = nm_ap.get_flags() 240 wpa_flags = nm_ap.get_wpa_flags() 241 rsn_flags = nm_ap.get_rsn_flags() 242 sec_str = "" 243 if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and 244 (wpa_flags == 0) and (rsn_flags == 0)): 245 sec_str += " WEP" 246 if wpa_flags != 0: 247 sec_str += " WPA1" 248 if rsn_flags != 0: 249 sec_str += " WPA2" 250 if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or 251 (rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)): 252 sec_str += " 802.1X" 253 254 # If there is no security use "--" 255 if sec_str == "": 256 sec_str = "--" 257 return sec_str.lstrip() 258 259 260 class Action(): # pylint: disable=too-few-public-methods 261 """Helper class to execute functions from a string variable""" 262 def __init__(self, 263 name, 264 func, 265 args=None, 266 active=False): 267 self.name = name 268 self.func = func 269 self.is_active = active 270 if args is None: 271 self.args = None 272 elif isinstance(args, list): 273 self.args = args 274 else: 275 self.args = [args] 276 277 def __str__(self): 278 return self.name 279 280 def __call__(self): 281 if self.args is None: 282 self.func() 283 else: 284 self.func(*self.args) 285 286 287 def process_ap(nm_ap, is_active, adapter): 288 """Activate/Deactivate a connection and get password if required""" 289 if is_active: 290 CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap) 291 else: 292 conns_cur = [i for i in CONNS if 293 i.get_setting_wireless() is not None and 294 i.get_setting_wireless().get_mac_address() == 295 adapter.get_permanent_hw_address()] 296 con = nm_ap.filter_connections(conns_cur) 297 if len(con) > 1: 298 raise ValueError("There are multiple connections possible") 299 300 if len(con) == 1: 301 CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(), 302 None, activate_cb, nm_ap) 303 else: 304 if ap_security(nm_ap) != "--": 305 password = get_passphrase() 306 else: 307 password = "" 308 set_new_connection(nm_ap, password, adapter) 309 LOOP.run() 310 311 312 def activate_cb(dev, res, data): 313 """Notification if activate connection completed successfully 314 315 """ 316 try: 317 conn = dev.activate_connection_finish(res) 318 except GLib.Error: 319 conn = None 320 if conn is not None: 321 notify("Activated {}".format(conn.get_id())) 322 else: 323 notify("Problem activating {}".format(data.get_id()), 324 urgency="critical") 325 LOOP.quit() 326 327 328 def deactivate_cb(dev, res, data): 329 """Notification if deactivate connection completed successfully 330 331 """ 332 if dev.deactivate_connection_finish(res) is True: 333 notify("Deactivated {}".format(data.get_id())) 334 else: 335 notify("Problem deactivating {}".format(data.get_id()), 336 urgency="critical") 337 LOOP.quit() 338 339 340 def process_vpngsm(con, activate): 341 """Activate/deactive VPN or GSM connections""" 342 if activate: 343 CLIENT.activate_connection_async(con, None, None, 344 None, activate_cb, con) 345 else: 346 CLIENT.deactivate_connection_async(con, None, deactivate_cb, con) 347 LOOP.run() 348 349 350 def create_ap_actions(aps, active_ap, active_connection, adapter): # pylint: disable=too-many-locals 351 """For each AP in a list, create the string and its attached function 352 (activate/deactivate) 353 354 """ 355 active_ap_bssid = active_ap.get_bssid() if active_ap is not None else "" 356 357 names = [ssid_to_utf8(ap) for ap in aps] 358 max_len_name = max([len(name) for name in names]) if names else 0 359 secs = [ap_security(ap) for ap in aps] 360 max_len_sec = max([len(sec) for sec in secs]) if secs else 0 361 362 ap_actions = [] 363 364 for nm_ap, name, sec in zip(aps, names, secs): 365 bars = NM.utils_wifi_strength_bars(nm_ap.get_strength()) 366 wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False) 367 if wifi_chars: 368 bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*']) 369 is_active = nm_ap.get_bssid() == active_ap_bssid 370 compact = CONF.getboolean("dmenu", "compact", fallback=False) 371 if compact: 372 action_name = u"{} {} {}".format(name, sec, bars) 373 else: 374 action_name = u"{:<{}s} {:<{}s} {}".format(name, max_len_name, sec, 375 max_len_sec, bars) 376 if is_active: 377 ap_actions.append(Action(action_name, process_ap, 378 [active_connection, True, adapter], 379 active=True)) 380 else: 381 ap_actions.append(Action(action_name, process_ap, 382 [nm_ap, False, adapter])) 383 return ap_actions 384 385 386 def create_vpn_actions(vpns, active): 387 """Create the list of strings to display with associated function 388 (activate/deactivate) for VPN connections. 389 390 """ 391 active_vpns = [i for i in active if i.get_vpn()] 392 return _create_vpngsm_actions(vpns, active_vpns, "VPN") 393 394 395 def create_wireguard_actions(wgs, active): 396 """Create the list of strings to display with associated function 397 (activate/deactivate) for Wireguard connections. 398 399 """ 400 active_wgs = [i for i in active if i.get_connection_type() == "wireguard"] 401 return _create_vpngsm_actions(wgs, active_wgs, "Wireguard") 402 403 404 def create_eth_actions(eths, active): 405 """Create the list of strings to display with associated function 406 (activate/deactivate) for Ethernet connections. 407 408 """ 409 active_eths = [i for i in active if 'ethernet' in i.get_connection_type()] 410 return _create_vpngsm_actions(eths, active_eths, "Eth") 411 412 413 def create_gsm_actions(gsms, active): 414 """Create the list of strings to display with associated function 415 (activate/deactivate) GSM connections.""" 416 active_gsms = [i for i in active if 417 i.get_connection() is not None and 418 i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)] 419 return _create_vpngsm_actions(gsms, active_gsms, "GSM") 420 421 422 def create_blue_actions(blues, active): 423 """Create the list of strings to display with associated function 424 (activate/deactivate) Bluetooth connections.""" 425 active_blues = [i for i in active if 426 i.get_connection() is not None and 427 i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] 428 return _create_vpngsm_actions(blues, active_blues, "Bluetooth") 429 430 431 def create_saved_actions(saved): 432 """Create the list of strings to display with associated function 433 (activate/deactivate) for VPN connections. 434 435 """ 436 return _create_vpngsm_actions(saved, [], "SAVED") 437 438 439 def _create_vpngsm_actions(cons, active_cons, label): 440 active_con_ids = [a.get_id() for a in active_cons] 441 actions = [] 442 for con in cons: 443 is_active = con.get_id() in active_con_ids 444 action_name = u"{}:{}".format(con.get_id(), label) 445 if is_active: 446 active_connection = [a for a in active_cons 447 if a.get_id() == con.get_id()] 448 if len(active_connection) != 1: 449 raise ValueError(u"Multiple active connections match" 450 " the connection: {}".format(con.get_id())) 451 active_connection = active_connection[0] 452 453 actions.append(Action(action_name, process_vpngsm, 454 [active_connection, False], active=True)) 455 else: 456 actions.append(Action(action_name, process_vpngsm, 457 [con, True])) 458 return actions 459 460 461 def create_wwan_actions(client): 462 """Create WWWAN actions 463 464 """ 465 wwan_enabled = client.wwan_get_enabled() 466 wwan_action = "Disable" if wwan_enabled else "Enable" 467 return [Action("{} WWAN".format(wwan_action), toggle_wwan, not wwan_enabled)] 468 469 470 def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): 471 """Combine all given actions into a list of actions. 472 473 Args: args - eths: list of Actions 474 aps: list of Actions 475 vpns: list of Actions 476 gsms: list of Actions 477 blues: list of Actions 478 wwan: list of Actions 479 others: list of Actions 480 """ 481 compact = CONF.getboolean("dmenu", "compact", fallback=False) 482 empty_action = [Action('', None)] if not compact else [] 483 all_actions = [] 484 all_actions += eths + empty_action if eths else [] 485 all_actions += aps + empty_action if aps else [] 486 all_actions += vpns + empty_action if vpns else [] 487 all_actions += wgs + empty_action if wgs else [] 488 all_actions += gsms + empty_action if (gsms and wwan) else [] 489 all_actions += blues + empty_action if blues else [] 490 all_actions += wwan + empty_action if wwan else [] 491 all_actions += others + empty_action if others else [] 492 all_actions += saved + empty_action if saved else [] 493 return all_actions 494 495 496 def get_selection(all_actions): 497 """Spawn dmenu for selection and execute the associated action.""" 498 rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) 499 inp = [] 500 501 if rofi_highlight is True: 502 inp = [str(action) for action in all_actions] 503 else: 504 inp = [('== ' if action.is_active else ' ') + str(action) 505 for action in all_actions] 506 active_lines = [index for index, action in enumerate(all_actions) 507 if action.is_active] 508 509 inp_bytes = "\n".join(inp).encode(ENC) 510 command = dmenu_cmd(len(inp), active_lines=active_lines) 511 sel = Popen(command, stdin=PIPE, stdout=PIPE, 512 env=ENV).communicate(input=inp_bytes)[0].decode(ENC) 513 514 if not sel.rstrip(): 515 sys.exit() 516 517 if rofi_highlight is False: 518 action = [i for i in all_actions 519 if ((str(i).strip() == str(sel.strip()) 520 and not i.is_active) or 521 ('== ' + str(i) == str(sel.rstrip('\n')) 522 and i.is_active))] 523 else: 524 action = [i for i in all_actions if str(i).strip() == sel.strip()] 525 assert len(action) == 1, \ 526 u"Selection was ambiguous: '{}'".format(str(sel.strip())) 527 return action[0] 528 529 530 def toggle_networking(enable): 531 """Enable/disable networking 532 533 Args: enable - boolean 534 535 """ 536 toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable)) 537 try: 538 CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle, 539 None, -1, None, None, None) 540 except AttributeError: 541 # Workaround for older versions of python-gobject 542 CLIENT.networking_set_enabled(enable) 543 notify("Networking {}".format("enabled" if enable is True else "disabled")) 544 545 546 def toggle_wifi(enable): 547 """Enable/disable Wifi 548 549 Args: enable - boolean 550 551 """ 552 toggle = GLib.Variant.new_boolean(enable) 553 try: 554 CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle, 555 -1, None, None, None) 556 except AttributeError: 557 # Workaround for older versions of python-gobject 558 CLIENT.wireless_set_enabled(enable) 559 notify("Wifi {}".format("enabled" if enable is True else "disabled")) 560 561 562 def toggle_wwan(enable): 563 """Enable/disable WWAN 564 565 Args: enable - boolean 566 567 """ 568 toggle = GLib.Variant.new_boolean(enable) 569 try: 570 CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, 571 -1, None, None, None) 572 except AttributeError: 573 # Workaround for older versions of python-gobject 574 CLIENT.wwan_set_enabled(enable) 575 notify("Wwan {}".format("enabled" if enable is True else "disabled")) 576 577 578 def toggle_bluetooth(enable): 579 """Enable/disable Bluetooth 580 581 Args: enable - boolean 582 583 References: 584 https://github.com/blueman-project/blueman/blob/master/blueman/plugins/mechanism/RfKill.py 585 https://www.kernel.org/doc/html/latest/driver-api/rfkill.html 586 https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9 587 588 """ 589 type_bluetooth = 2 590 op_change_all = 3 591 idx = 0 592 soft_state = 0 if enable else 1 593 hard_state = 0 594 595 data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all, 596 soft_state, hard_state) 597 598 try: 599 with open('/dev/rfkill', 'r+b', buffering=0) as rff: 600 rff.write(data) 601 except PermissionError: 602 notify("Lacking permission to write to /dev/rfkill.", 603 "Maybe you need to add your user to the 'rfkill' group?", 604 urgency="critical") 605 else: 606 notify("Bluetooth {}".format("enabled" if enable else "disabled")) 607 608 609 def launch_connection_editor(): 610 """Launch nmtui or the gui nm-connection-editor 611 612 """ 613 terminal = CONF.get("editor", "terminal", fallback="xterm") 614 gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True) 615 if gui_if_available is True: 616 try: 617 Popen(["gnome-control-center", "network"]).communicate() 618 except OSError: 619 try: 620 Popen(["nm-connection-editor"]).communicate() 621 except OSError: 622 Popen([terminal, "-e", "nmtui"]).communicate() 623 else: 624 Popen([terminal, "-e", "nmtui"]).communicate() 625 626 627 def get_passphrase(): 628 """Get a password 629 630 Returns: string 631 632 """ 633 pinentry = CONF.get("dmenu", "pinentry", fallback=None) 634 if pinentry: 635 pin = "" 636 out = Popen(pinentry, 637 stdout=PIPE, 638 stdin=PIPE).communicate(input=b'setdesc Get network password\ngetpin\n')[0] 639 if out: 640 res = out.decode(ENC).split("\n")[2] 641 if res.startswith("D "): 642 pin = res.split("D ")[1] 643 return pin 644 return Popen(dmenu_cmd(0, "Passphrase"), 645 stdin=PIPE, stdout=PIPE).communicate()[0].decode(ENC) 646 647 648 def delete_connection(): 649 """Display list of NM connections and delete the selected one 650 651 """ 652 conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS] 653 conn_names = "\n".join([str(i) for i in conn_acts]).encode(ENC) 654 sel = Popen(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"), 655 stdin=PIPE, 656 stdout=PIPE, 657 env=ENV).communicate(input=conn_names)[0].decode(ENC) 658 if not sel.strip(): 659 sys.exit() 660 action = [i for i in conn_acts if str(i) == sel.rstrip("\n")] 661 assert len(action) == 1, u"Selection was ambiguous: {}".format(str(sel)) 662 action[0]() 663 LOOP.run() 664 665 666 def delete_cb(dev, res, data): 667 """Notification if delete completed successfully 668 669 """ 670 if dev.delete_finish(res) is True: 671 notify("Deleted {}".format(dev.get_id())) 672 else: 673 notify("Problem deleting {}".format(dev.get_id()), urgency="critical") 674 LOOP.quit() 675 676 677 def set_new_connection(nm_ap, nm_pw, adapter): 678 """Setup a new NetworkManager connection 679 680 Args: ap - NM.AccessPoint 681 pw - string 682 683 """ 684 nm_pw = str(nm_pw).strip() 685 profile = create_wifi_profile(nm_ap, nm_pw, adapter) 686 CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(), 687 None, verify_conn, profile) 688 LOOP.run() 689 690 691 def create_wifi_profile(nm_ap, password, adapter): 692 # pylint: disable=C0301 693 # From https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/gi/add_connection.py 694 # and https://cgit.freedesktop.org/NetworkManager/NetworkManager/tree/examples/python/dbus/add-wifi-psk-connection.py 695 # pylint: enable=C0301 696 """Create the NM profile given the AP and passphrase""" 697 ap_sec = ap_security(nm_ap) 698 profile = NM.SimpleConnection.new() 699 700 s_con = NM.SettingConnection.new() 701 s_con.set_property(NM.SETTING_CONNECTION_ID, ssid_to_utf8(nm_ap)) 702 s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4())) 703 s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless") 704 profile.add_setting(s_con) 705 706 s_wifi = NM.SettingWireless.new() 707 s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid()) 708 s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure') 709 s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()) 710 profile.add_setting(s_wifi) 711 712 s_ip4 = NM.SettingIP4Config.new() 713 s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto") 714 profile.add_setting(s_ip4) 715 716 s_ip6 = NM.SettingIP6Config.new() 717 s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto") 718 profile.add_setting(s_ip6) 719 720 if ap_sec != "--": 721 s_wifi_sec = NM.SettingWirelessSecurity.new() 722 if "WPA" in ap_sec: 723 s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, 724 "wpa-psk") 725 s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, 726 "open") 727 s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password) 728 elif "WEP" in ap_sec: 729 s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, 730 "None") 731 s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, 732 NM.WepKeyType.PASSPHRASE) 733 s_wifi_sec.set_wep_key(0, password) 734 profile.add_setting(s_wifi_sec) 735 736 return profile 737 738 739 def verify_conn(client, result, data): 740 """Callback function for add_and_activate_connection_async 741 742 Check if connection completes successfully. Delete the connection if there 743 is an error. 744 745 """ 746 try: 747 act_conn = client.add_and_activate_connection_finish(result) 748 conn = act_conn.get_connection() 749 if not all([conn.verify(), 750 conn.verify_secrets(), 751 data.verify(), 752 data.verify_secrets()]): 753 raise GLib.Error 754 notify("Added {}".format(conn.get_id())) 755 except GLib.Error: # pylint: disable=catching-non-exception 756 try: 757 notify("Connection to {} failed".format(conn.get_id()), 758 urgency="critical") 759 conn.delete_async(None, None, None) 760 except UnboundLocalError: 761 pass 762 finally: 763 LOOP.quit() 764 765 766 def create_ap_list(adapter, active_connections): 767 """Generate list of access points. Remove duplicate APs , keeping strongest 768 ones and the active AP 769 770 Args: adapter 771 active_connections - list of all active connections 772 Returns: aps - list of access points 773 active_ap - active AP 774 active_ap_con - active Connection 775 adapter 776 777 """ 778 aps = [] 779 ap_names = [] 780 active_ap = adapter.get_active_access_point() 781 aps_all = sorted(adapter.get_access_points(), 782 key=lambda a: a.get_strength(), reverse=True) 783 conns_cur = [i for i in CONNS if 784 i.get_setting_wireless() is not None and 785 i.get_setting_wireless().get_mac_address() == 786 adapter.get_permanent_hw_address()] 787 try: 788 ap_conns = active_ap.filter_connections(conns_cur) 789 active_ap_name = ssid_to_utf8(active_ap) 790 active_ap_con = [active_conn for active_conn in active_connections 791 if active_conn.get_connection() in ap_conns] 792 except AttributeError: 793 active_ap_name = None 794 active_ap_con = [] 795 if len(active_ap_con) > 1: 796 raise ValueError("Multiple connection profiles match" 797 " the wireless AP") 798 active_ap_con = active_ap_con[0] if active_ap_con else None 799 for nm_ap in aps_all: 800 ap_name = ssid_to_utf8(nm_ap) 801 if nm_ap != active_ap and ap_name == active_ap_name: 802 # Skip adding AP if it's not active but same name as active AP 803 continue 804 if ap_name not in ap_names: 805 ap_names.append(ap_name) 806 aps.append(nm_ap) 807 return aps, active_ap, active_ap_con, adapter 808 809 810 def notify(message, details=None, urgency="low"): 811 """Use notify-send if available for notifications 812 813 """ 814 args = ["-u", urgency, message] 815 if details is not None: 816 args.append(details) 817 818 try: 819 Popen(["notify-send"] + args, 820 stdout=PIPE, stderr=PIPE).communicate() 821 except FileNotFoundError: 822 pass 823 824 825 def run(): 826 """Main script entrypoint""" 827 active = CLIENT.get_active_connections() 828 adapter = choose_adapter(CLIENT) 829 if adapter: 830 ap_actions = create_ap_actions(*create_ap_list(adapter, active)) 831 else: 832 ap_actions = [] 833 834 vpns = [i for i in CONNS if i.is_type(NM.SETTING_VPN_SETTING_NAME)] 835 try: 836 wgs = [i for i in CONNS if i.is_type(NM.SETTING_WIREGUARD_SETTING_NAME)] 837 except AttributeError: 838 # Workaround for older versions of python-gobject with no wireguard support 839 wgs = [] 840 eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)] 841 blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] 842 843 vpn_actions = create_vpn_actions(vpns, active) 844 wg_actions = create_wireguard_actions(wgs, active) 845 eth_actions = create_eth_actions(eths, active) 846 blue_actions = create_blue_actions(blues, active) 847 other_actions = create_other_actions(CLIENT) 848 wwan_installed = is_modemmanager_installed() 849 if wwan_installed: 850 gsms = [i for i in CONNS if i.is_type(NM.SETTING_GSM_SETTING_NAME)] 851 gsm_actions = create_gsm_actions(gsms, active) 852 wwan_actions = create_wwan_actions(CLIENT) 853 else: 854 gsm_actions = [] 855 wwan_actions = [] 856 857 list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False) 858 saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues] 859 if list_saved: 860 saved_actions = create_saved_actions(saved_cons) 861 else: 862 saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])] 863 864 actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions, 865 gsm_actions, blue_actions, wwan_actions, 866 other_actions, saved_actions) 867 sel = get_selection(actions) 868 sel() 869 870 871 if __name__ == '__main__': 872 run() 873 874 # vim: set et ts=4 sw=4 :