from traceback import print_exc from math import ceil from xray_config import xray_manager_config, xray_local_config, xray_config from binascii import a2b_base64 from re import match from urllib.parse import unquote_plus, parse_qs from urllib import request import sys import json from os import path as _pth CURRENT_DIR = _pth.abspath(_pth.dirname(__file__)) sys.path.append(CURRENT_DIR) def fill_padding(base64_encode_str): need_padding = len(base64_encode_str) % 4 != 0 if need_padding: missing_padding = 4 - need_padding base64_encode_str += '=' * missing_padding return base64_encode_str def base64_decode(base64_encode_str): base64_encode_str = base64_encode_str.replace("-", "+").replace("_", "/") base64_encode_str = fill_padding(base64_encode_str) return a2b_base64(base64_encode_str).decode('utf8') def parse_format_sip002(line): b64data = match(r"^ss:\/\/(.+?)@(.+):(.+?)(?:[\/\?].*?)?(#(.+))?$", line) if b64data == None: return None account_info_b46 = b64data[1] enc, passwd = base64_decode(account_info_b46).split(":") server_addr = b64data[2] server_port = b64data[3] remarks = b64data[5].strip() remarks = server_addr if remarks == None else unquote_plus( remarks, "utf8", errors='ignore') return { "protocol": "shadowsocks", "settings": { "servers": [ { "address": server_addr, "port": int(server_port), "password": passwd, "method": enc } ] }, "remarks": remarks, } def parse_format_sip002_like_protocol(line): mt = match( r"^(\w+):\/\/(.+?)@(.+):(.+?)(?:(?:[\/\?]{1,2})(.*?))?(?:#(.+))?$", line) if mt == None: return None protocol = mt[1] uuid = mt[2] server_addr = mt[3] server_port = mt[4] query_string = mt[5] try: cfg = parse_qs(query_string, strict_parsing=True) except: return None remarks = mt[6].strip() if mt[6] else None remarks = server_addr if remarks == None else unquote_plus( remarks, "utf8", errors='ignore') server = { "remarks": remarks, "protocol": protocol.lower(), } settings = { "address": server_addr, "port": int(server_port), } match server["protocol"]: case "vless": settings["id"] = uuid settings["encryption"] = cfg.get("encryption", ["none"])[0] settings["flow"] = cfg.get("flow", [""])[0] settings["level"] = int(cfg.get("level", ["0"])[0]) case "vmess": settings["id"] = uuid settings["security"] = "auto" settings["level"] = int(cfg.get("level", ["0"])[0]) case "hysteria2": server["protocol"] = "hysteria" settings["version"] = 2 cfg.setdefault("type", ["hysteria"]) # default network hysteria case _: print(f"protocol {server['protocol']} not support:", line) return None server["settings"] = settings stream_settings = {} match cfg.get("type", ["raw"])[0]: case "tcp" | "raw": stream_settings["network"] = "raw" stream_settings["rawSettings"] = {} if "headerType" in cfg: headerType = cfg["headerType"][0] match headerType: case "none": stream_settings["rawSettings"]["header"] = { "type": "none"} case _: print(f"headerType {headerType} not support:", line) return None case "ws": stream_settings["network"] = "ws" stream_settings["wsSettings"] = {} stream_settings["wsSettings"]["heartbeatPeriod"] = 20 if "host" in cfg: stream_settings["wsSettings"]["host"] = cfg["host"][0] if "path" in cfg: stream_settings["wsSettings"]["path"] = cfg["path"][0] if "headerType" in cfg and cfg["headerType"][0] != "none": print(f"headerType {cfg['headerType'][0]} not support:", line) return None case "hysteria": del cfg["type"] stream_settings["network"] = "hysteria" stream_settings["hysteriaSettings"] = {} stream_settings["hysteriaSettings"]["auth"] = uuid stream_settings["hysteriaSettings"]["version"] = 2 # default security hysteria cfg.setdefault("security", ["hysteria"]) case _: print( f"Streaming type {cfg.get('type', ['tcp'])[0]} not support:", line) return None match cfg.get("security", ["none"])[0]: case "tls": stream_settings["security"] = "tls" stream_settings["tlsSettings"] = { "serverName": cfg.get("sni", [server_addr])[0], "allowInsecure": False, "fingerprint": cfg.get("fp", ["chrome"])[0], } case "reality": stream_settings["security"] = "reality" stream_settings["realitySettings"] = { "serverName": cfg.get("sni", [server_addr])[0], "fingerprint": cfg.get("fp", ["chrome"])[0], "password": cfg.get("pbk", [""])[0], "shortId": cfg.get("sid", [""])[0], } case "none": stream_settings["security"] = "none" case "hysteria": del cfg["security"] stream_settings["security"] = "tls" stream_settings["tlsSettings"] = { "serverName": cfg.get("sni", [server_addr])[0], "alpn": ["h3"], } if "insecure" in cfg and cfg["insecure"][0] == "1": stream_settings["tlsSettings"]["allowInsecure"] = True stream_settings["finalmask"] = { "quicParams": {}, } if "mport" in cfg: stream_settings["finalmask"]["quicParams"]["udpHop"] = { "ports": cfg["mport"][0] } case _: print( f"Streaming security {cfg.get('security', ['none'])[0]} not support:", line) return None server["streamSettings"] = stream_settings return server def parse_format_encoded_json_url(line): b64data = match(r"^(\w+?):\/\/(.*)$", line) if b64data == None: return None protocol = b64data[1] b64data = b64data[2] try: data = base64_decode(b64data) server_json = json.loads(data) except: return None if protocol.lower() == "vless": user = { "id": server_json["id"], "encryption": "none", } else: user = { "id": server_json["id"], "alterId": int(server_json["aid"]), "security": "auto", } stream = { "network": server_json["net"], } if server_json["net"].lower() == "tcp": if server_json["type"] == "http": stream["tcpSettings"] = { "header": { "type": "http", "request": { "headers": { "Host": server_json["host"].split(","), }, }, } } else: stream["tcpSettings"] = {} elif server_json["net"].lower() == "ws": stream["wsSettings"] = { "path": server_json["path"], "headers": { "Host": server_json["host"] }, } elif server_json["net"].lower() == "h2": stream["httpSettings"] = { "host": [server_json["host"]], "path": server_json["path"], } elif server_json["net"].lower() == "grpc": stream["grpcSettings"] = { "serviceName": server_json["path"], } else: print(f"net {server_json['net'].lower()} not support:", server_json) return None if "tls" in server_json and server_json["tls"] == "tls": stream["security"] = "tls" stream["tlsSettings"] = { "serverName": server_json["sni"] } else: stream["security"] = "none" server = { "protocol": protocol, "settings": { "vnext": [ { "address": server_json["add"], "port": int(server_json["port"]), "users": [ user, ], }, ], }, "streamSettings": stream, "mux": { "enabled": False, "concurrency": 8, }, } server["remarks"] = server_json["ps"] return server def get_servers_from_subscribe_url(url): servers = [] try: req = request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.7,en;q=0.3", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "cross-site", "Priority": "u=0, i", "TE": "trailers", }, method="GET") data = request.urlopen(req, timeout=5.0).read() try: text = a2b_base64(data).decode("utf8") except: text = data.decode("utf8") except: print_exc() return servers for line in text.split("\n"): try: server = parse_format_sip002(line) server = parse_format_sip002_like_protocol( line) if server == None else server server = parse_format_encoded_json_url( line) if server == None else server if server != None: servers.append(server) except: print(line) print_exc(limit=2) return servers def main(): loop = True while loop: try: print("<==============================:") print(": What do you want to do?") print(": 1. Add a subscribe url.") print(": 2. Remove a subscribe url.") print(": 3. List all subscribe urls.") print(": 4. Update servers from subscribe urls.") print(": 5. Select the server from exists one.") print(": q. Quit.") func_sel = input(": Input a number to continue: ") print(":==============================>") if func_sel == "1": url = input("> Input a url to add: ") xray_manager_config.setdefault("subscribe", []).append(url) xray_manager_config.commit() elif func_sel == "2": i = int(input("> Input the index to remove: ")) if "subscribe" in xray_manager_config and len(xray_manager_config["subscribe"]) > i and i >= 0: del xray_manager_config["subscribe"][i] xray_manager_config.commit() elif func_sel == "3": subscribes = xray_manager_config.setdefault("subscribe", []) for i in range(len(subscribes)): print("{:< 4d}: {}".format(i, subscribes[i])) if len(subscribes) <= 0: print("Empty subscribe urls.") elif func_sel == "4": servers = [] for url in xray_manager_config.setdefault("subscribe", []): print("Fetching {}...".format(url)) remote_servers = get_servers_from_subscribe_url(url) print("Added {} server(s)".format(len(remote_servers))) servers.extend(remote_servers) xray_manager_config["servers"] = servers xray_manager_config.commit() elif func_sel == "5": servers = xray_manager_config.setdefault("servers", []) page_size = len(servers) page_count = ceil(page_size / 5) pages = 0 loop2 = True while loop2: print("q : Give up.") print("<<<<: Last page. '<' means 1 page, '<<' means 2 pages.") print(">>>>: Next page. '>' means 1 page, '>>' means 2 pages.") for i in range(pages*5, pages*5+5): if i >= page_size: break print("{:< 4d}: {}".format(i, servers[i]["remarks"])) print(": PAGE {}/{}".format(pages+1, page_count)) num_sel = input( "> Please input a number or operators to continue: ") if num_sel.startswith("<"): pages -= num_sel.count("<") if pages < 0: pages = 0 elif num_sel.startswith(">"): pages += num_sel.count(">") if pages >= page_count: pages = page_count - 1 elif num_sel.lower() == "q": loop2 = False else: index = -1 try: index = int(num_sel) except: pass if index >= 0 and index < page_size: server = servers[index] out = {} for k in server: if k in ["remarks"]: continue out[k] = server[k] xray_config.clear() for k in xray_local_config: xray_config[k] = xray_local_config[k] out_list = [out] if "outbounds" in xray_config: out_list.extend(xray_config["outbounds"]) xray_config["outbounds"] = out_list xray_config.commit() loop2 = False elif func_sel.lower() == "q": loop = False except KeyboardInterrupt: loop = False except: print_exc() print() if __name__ == "__main__": main() # debug # get_servers_from_subscribe_url("")