v2xray/xray_manager.py
2025-02-12 15:46:41 +08:00

265 lines
9.7 KiB
Python

import json
from os import path as _pth
CURRENT_DIR = _pth.abspath(_pth.dirname(__file__))
import sys
sys.path.append(CURRENT_DIR)
from urllib import request
from urllib.parse import unquote_plus
from re import match
from binascii import a2b_base64
from xray_config import xray_manager_config, xray_local_config, xray_config
from math import ceil
from traceback import print_exc
def fill_padding(base64_encode_str):
need_padding = len(base64_encode_str) % 4 != 0
if need_padding:
missing_padding = 4 - need_padding
base64_encode_str += '=' * missing_padding
return base64_encode_str
def base64_decode(base64_encode_str):
base64_encode_str = base64_encode_str.replace("-", "+").replace("_", "/")
base64_encode_str = fill_padding(base64_encode_str)
return a2b_base64(base64_encode_str).decode('utf8')
def parse_format_sip002(line):
b64data = match(r"^ss:\/\/(.+?)@(.+):(.+?)(?:[\/\?].*?)?(#(.+))?$", line)
if b64data == None:
return None
account_info_b46 = b64data[1]
enc, passwd = base64_decode(account_info_b46).split(":")
server_addr = b64data[2]
server_port = b64data[3]
remarks = b64data[5].strip()
remarks = server_addr if remarks == None else unquote_plus(remarks, "utf8", errors='ignore')
return {
"protocol": "shadowsocks",
"settings": {
"servers": [
{
"address": server_addr,
"port": int(server_port),
"password": passwd,
"method": enc
}
]
},
"remarks": remarks,
}
def parse_format_default(line):
b64data = match(r"^(\w+?):\/\/(.*)$", line)
if b64data == None:
return None
protocol = b64data[1]
b64data = b64data[2]
data = base64_decode(b64data)
server_json = json.loads(data)
if protocol.lower() == "vless":
user = {
"id": server_json["id"],
"encryption": "none",
}
else:
user = {
"id": server_json["id"],
"alterId": int(server_json["aid"]),
"security": "auto",
}
stream = {
"network": server_json["net"],
}
if server_json["net"].lower() == "tcp":
if server_json["type"].lower() == "http":
stream["tcpSettings"] = {
"header": {
"type": "http",
"request": {
"headers": {
"Host": server_json["host"].split(","),
},
},
}
}
else:
stream["tcpSettings"] = {}
elif server_json["net"].lower() == "ws":
stream["wsSettings"] = {
"path": server_json["path"],
"headers": {
"Host": server_json["host"]
},
}
elif server_json["net"].lower() == "h2":
stream["httpSettings"] = {
"host": [server_json["host"]],
"path": server_json["path"],
}
elif server_json["net"].lower() == "grpc":
stream["grpcSettings"] = {
"serviceName": server_json["path"],
}
else:
print(f"net {server_json['net'].lower()} not support:", server_json)
return None
if server_json["tls"] == "tls":
stream["security"] = "tls"
stream["tlsSettings"] = {
"serverName": server_json["sni"]
}
else:
stream["security"] = "none"
server = {
"protocol": protocol,
"settings": {
"vnext": [
{
"address": server_json["add"],
"port": int(server_json["port"]),
"users": [
user,
],
},
],
},
"streamSettings": stream,
"mux": {
"enabled": False,
"concurrency": 8,
},
}
server["remarks"] = server_json["ps"]
return server
def get_servers_from_subscribe_url(url):
servers = []
try:
host = match(r"^(?:(?:https?)|(?:file)):\/\/([\w\.]*?)(?:[\/\?\#].*)?$", url)[1]
req = request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.7,en;q=0.3",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Priority": "u=0, i",
"TE": "trailers",
}, method="GET")
data = request.urlopen(req, timeout=5.0).read()
try:
text = a2b_base64(data).decode("utf8")
except:
text = data.decode("utf8")
except:
print_exc()
return servers
for line in text.split("\n"):
try:
server = parse_format_sip002(line)
server = parse_format_default(line) if server == None else server
if server != None:
servers.append(server)
except:
print(line)
print_exc()
return servers
def main():
loop = True
while loop:
try:
print("<==============================:")
print(": What do you want to do?")
print(": 1. Add a subscribe url.")
print(": 2. Remove a subscribe url.")
print(": 3. List all subscribe urls.")
print(": 4. Update servers from subscribe urls.")
print(": 5. Select the server from exists one.")
print(": q. Quit.")
func_sel = input(": Input a number to continue: ")
print(":==============================>")
if func_sel == "1":
url = input("> Input a url to add: ")
xray_manager_config.setdefault("subscribe", []).append(url)
xray_manager_config.commit()
elif func_sel == "2":
i = int(input("> Input the index to remove: "))
if "subscribe" in xray_manager_config and len(xray_manager_config["subscribe"]) > i and i >= 0:
del xray_manager_config["subscribe"][i]
xray_manager_config.commit()
elif func_sel == "3":
subscribes = xray_manager_config.setdefault("subscribe", [])
for i in range(len(subscribes)):
print("{:< 4d}: {}".format(i, subscribes[i]))
if len(subscribes) <= 0:
print("Empty subscribe urls.")
elif func_sel == "4":
servers = []
for url in xray_manager_config.setdefault("subscribe", []):
print("Fetching {}...".format(url))
remote_servers = get_servers_from_subscribe_url(url)
print("Added {} server(s)".format(len(remote_servers)))
servers.extend(remote_servers)
xray_manager_config["servers"] = servers
xray_manager_config.commit()
elif func_sel == "5":
servers = xray_manager_config.setdefault("servers", [])
page_size = len(servers)
page_count = ceil(page_size / 5)
pages = 0
loop2 = True
while loop2:
print("q : Give up.")
print("<<<<: Last page. '<' means 1 page, '<<' means 2 pages.")
print(">>>>: Next page. '>' means 1 page, '>>' means 2 pages.")
for i in range(pages*5, pages*5+5):
if i >= page_size:
break
print("{:< 4d}: {}".format(i, servers[i]["remarks"]))
print(": PAGE {}/{}".format(pages+1, page_count))
num_sel = input("> Please input a number or operators to continue: ")
if num_sel.startswith("<"):
pages -= num_sel.count("<")
if pages < 0:
pages = 0
elif num_sel.startswith(">"):
pages += num_sel.count(">")
if pages >= page_count:
pages = page_count - 1
elif num_sel.lower() == "q":
loop2 = False
else:
index = -1
try:
index = int(num_sel)
except: pass
if index >= 0 and index < page_size:
server = servers[index]
out = {}
for k in server:
if k in ["remarks"]:
continue
out[k] = server[k]
xray_config.clear()
for k in xray_local_config:
xray_config[k] = xray_local_config[k]
out_list = [out]
if "outbounds" in xray_config:
out_list.extend(xray_config["outbounds"])
xray_config["outbounds"] = out_list
xray_config.commit()
loop2 = False
elif func_sel.lower() == "q":
loop = False
except KeyboardInterrupt:
loop = False
except:
print_exc()
print()
if __name__ == "__main__":
main()