256 lines
9.2 KiB
Python
256 lines
9.2 KiB
Python
import json
|
|
from os import path as _pth
|
|
CURRENT_DIR = _pth.abspath(_pth.dirname(__file__))
|
|
import sys
|
|
sys.path.append(CURRENT_DIR)
|
|
|
|
from urllib import request
|
|
from urllib.parse import unquote_plus
|
|
from re import match
|
|
from binascii import a2b_base64
|
|
from xray_config import xray_manager_config, xray_local_config, xray_config
|
|
from math import ceil
|
|
from traceback import print_exc
|
|
|
|
def fill_padding(base64_encode_str):
|
|
need_padding = len(base64_encode_str) % 4 != 0
|
|
if need_padding:
|
|
missing_padding = 4 - need_padding
|
|
base64_encode_str += '=' * missing_padding
|
|
return base64_encode_str
|
|
|
|
def base64_decode(base64_encode_str):
|
|
base64_encode_str = base64_encode_str.replace("-", "+").replace("_", "/")
|
|
base64_encode_str = fill_padding(base64_encode_str)
|
|
return a2b_base64(base64_encode_str).decode('utf8')
|
|
|
|
def parse_format_sip002(line):
|
|
b64data = match(r"^ss:\/\/(.+?)@(.+):(.+?)(?:[\/\?].*?)?(#(.+))?$", line)
|
|
if b64data == None:
|
|
return None
|
|
account_info_b46 = b64data[1]
|
|
enc, passwd = base64_decode(account_info_b46).split(":")
|
|
server_addr = b64data[2]
|
|
server_port = b64data[3]
|
|
remarks = b64data[5].strip()
|
|
remarks = server_addr if remarks == None else unquote_plus(remarks, "utf8", errors='ignore')
|
|
return {
|
|
"protocol": "shadowsocks",
|
|
"settings": {
|
|
"servers": [
|
|
{
|
|
"address": server_addr,
|
|
"port": int(server_port),
|
|
"password": passwd,
|
|
"method": enc
|
|
}
|
|
]
|
|
},
|
|
"remarks": remarks,
|
|
}
|
|
|
|
def parse_format_default(line):
|
|
b64data = match(r"^(\w+?):\/\/(.*)$", line)
|
|
if b64data == None:
|
|
return None
|
|
protocol = b64data[1]
|
|
b64data = b64data[2]
|
|
data = base64_decode(b64data)
|
|
server_json = json.loads(data)
|
|
if protocol.lower() == "vless":
|
|
user = {
|
|
"id": server_json["id"],
|
|
"encryption": "none",
|
|
}
|
|
else:
|
|
user = {
|
|
"id": server_json["id"],
|
|
"alterId": int(server_json["aid"]),
|
|
"security": "auto",
|
|
}
|
|
stream = {
|
|
"network": server_json["net"],
|
|
}
|
|
if server_json["net"].lower() == "tcp":
|
|
if server_json["type"].lower() == "http":
|
|
stream["tcpSettings"] = {
|
|
"header": {
|
|
"type": "http",
|
|
"request": {
|
|
"headers": {
|
|
"Host": server_json["host"].split(","),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
else:
|
|
stream["tcpSettings"] = {}
|
|
elif server_json["net"].lower() == "ws":
|
|
stream["wsSettings"] = {
|
|
"path": server_json["path"],
|
|
"headers": {
|
|
"Host": server_json["host"]
|
|
},
|
|
}
|
|
elif server_json["net"].lower() == "h2":
|
|
stream["httpSettings"] = {
|
|
"host": [server_json["host"]],
|
|
"path": server_json["path"],
|
|
}
|
|
elif server_json["net"].lower() == "grpc":
|
|
stream["grpcSettings"] = {
|
|
"serviceName": server_json["path"],
|
|
}
|
|
else:
|
|
print(f"net {server_json['net'].lower()} not support:", server_json)
|
|
return None
|
|
if server_json["tls"] == "tls":
|
|
stream["security"] = "tls"
|
|
stream["tlsSettings"] = {
|
|
"serverName": server_json["sni"]
|
|
}
|
|
else:
|
|
stream["security"] = "none"
|
|
server = {
|
|
"protocol": protocol,
|
|
"settings": {
|
|
"vnext": [
|
|
{
|
|
"address": server_json["add"],
|
|
"port": int(server_json["port"]),
|
|
"users": [
|
|
user,
|
|
],
|
|
},
|
|
],
|
|
},
|
|
"streamSettings": stream,
|
|
"mux": {
|
|
"enabled": False,
|
|
"concurrency": 8,
|
|
},
|
|
}
|
|
server["remarks"] = server_json["ps"]
|
|
return server
|
|
|
|
def get_servers_from_subscribe_url(url):
|
|
servers = []
|
|
try:
|
|
host = match(r"^(?:(?:https?)|(?:file)):\/\/([\w\.]*?)(?:[\/\?\#].*)?$", url)[1]
|
|
req = request.Request(url, headers={
|
|
"Host": host,
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:98.0) Gecko/20100101 Firefox/98.0",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
|
"Accept-Language": "zh-CN,zh;q=0.7,en;q=0.3",
|
|
}, method="GET")
|
|
data = request.urlopen(req, timeout=5.0).read()
|
|
try:
|
|
text = a2b_base64(data).decode("utf8")
|
|
except:
|
|
text = data.decode("utf8")
|
|
except:
|
|
print_exc()
|
|
return servers
|
|
for line in text.split("\n"):
|
|
try:
|
|
server = parse_format_sip002(line)
|
|
server = parse_format_default(line) if server == None else server
|
|
if server != None:
|
|
servers.append(server)
|
|
except:
|
|
print(line)
|
|
print_exc()
|
|
return servers
|
|
|
|
def main():
|
|
loop = True
|
|
while loop:
|
|
try:
|
|
print("<==============================:")
|
|
print(": What do you want to do?")
|
|
print(": 1. Add a subscribe url.")
|
|
print(": 2. Remove a subscribe url.")
|
|
print(": 3. List all subscribe urls.")
|
|
print(": 4. Update servers from subscribe urls.")
|
|
print(": 5. Select the server from exists one.")
|
|
print(": q. Quit.")
|
|
func_sel = input(": Input a number to continue: ")
|
|
print(":==============================>")
|
|
if func_sel == "1":
|
|
url = input("> Input a url to add: ")
|
|
xray_manager_config.setdefault("subscribe", []).append(url)
|
|
xray_manager_config.commit()
|
|
elif func_sel == "2":
|
|
i = int(input("> Input the index to remove: "))
|
|
if "subscribe" in xray_manager_config and len(xray_manager_config["subscribe"]) > i and i >= 0:
|
|
del xray_manager_config["subscribe"][i]
|
|
xray_manager_config.commit()
|
|
elif func_sel == "3":
|
|
subscribes = xray_manager_config.setdefault("subscribe", [])
|
|
for i in range(len(subscribes)):
|
|
print("{:< 4d}: {}".format(i, subscribes[i]))
|
|
if len(subscribes) <= 0:
|
|
print("Empty subscribe urls.")
|
|
elif func_sel == "4":
|
|
servers = []
|
|
for url in xray_manager_config.setdefault("subscribe", []):
|
|
print("Fetching {}...".format(url))
|
|
remote_servers = get_servers_from_subscribe_url(url)
|
|
print("Added {} server(s)".format(len(remote_servers)))
|
|
servers.extend(remote_servers)
|
|
xray_manager_config["servers"] = servers
|
|
xray_manager_config.commit()
|
|
elif func_sel == "5":
|
|
servers = xray_manager_config.setdefault("servers", [])
|
|
page_size = len(servers)
|
|
page_count = ceil(page_size / 5)
|
|
pages = 0
|
|
loop2 = True
|
|
while loop2:
|
|
print("q : Give up.")
|
|
print("<<<<: Last page. '<' means 1 page, '<<' means 2 pages.")
|
|
print(">>>>: Next page. '>' means 1 page, '>>' means 2 pages.")
|
|
for i in range(pages*5, pages*5+5):
|
|
if i >= page_size:
|
|
break
|
|
print("{:< 4d}: {}".format(i, servers[i]["remarks"]))
|
|
print(": PAGE {}/{}".format(pages+1, page_count))
|
|
num_sel = input("> Please input a number or operators to continue: ")
|
|
if num_sel.startswith("<"):
|
|
pages -= num_sel.count("<")
|
|
if pages < 0:
|
|
pages = 0
|
|
elif num_sel.startswith(">"):
|
|
pages += num_sel.count(">")
|
|
if pages >= page_count:
|
|
pages = page_count - 1
|
|
elif num_sel.lower() == "q":
|
|
loop2 = False
|
|
else:
|
|
index = -1
|
|
try:
|
|
index = int(num_sel)
|
|
except: pass
|
|
if index >= 0 and index < page_size:
|
|
server = servers[index]
|
|
out = {}
|
|
for k in server:
|
|
if k in ["remarks"]:
|
|
continue
|
|
out[k] = server[k]
|
|
xray_config.clear()
|
|
for k in xray_local_config:
|
|
xray_config[k] = xray_local_config[k]
|
|
xray_config["outbounds"] = [out]
|
|
xray_config.commit()
|
|
loop2 = False
|
|
elif func_sel.lower() == "q":
|
|
loop = False
|
|
except KeyboardInterrupt:
|
|
loop = False
|
|
except:
|
|
print_exc()
|
|
print()
|
|
|
|
if __name__ == "__main__":
|
|
main() |