This commit is contained in:
2025-11-04 17:39:08 +08:00
parent 60a26d9860
commit 5c564dfc66
4 changed files with 140 additions and 95 deletions

View File

@@ -39,15 +39,24 @@ def main():
from .storage.vnode import VFileSystem, new_storage_file, new_storage_dir, dump_storage_item_lines
from pathlib import PurePosixPath
fs = VFileSystem()
d = new_storage_dir("dirs1", "dirt1", [
new_storage_file("srn1", "tgn1"),
new_storage_file("srn2", "tgn2"),
new_storage_file("srn3", "tgn3"),
new_storage_file("srn4", "tgn4"),
d1 = new_storage_dir("dirt1", [
new_storage_file("tgn1"),
new_storage_file("tgn2"),
new_storage_file("tgn3"),
new_storage_file("tgn4"),
])
fs.root["c"].append(d)
fs.root["c"].append(new_storage_file(t="fsafsa"))
print(fs.listdir(PurePosixPath("dirt1")))
for line in dump_storage_item_lines(fs.root):
print(line)
d2 = new_storage_dir("dirt2", [
d1,
new_storage_file("tgn1"),
new_storage_file("tgn2"),
new_storage_file("tgn3"),
new_storage_file("tgn4"),
])
d3 = new_storage_dir("dirt3", [])
fs._root["c"].append(d2)
fs._root["c"].append(new_storage_file(n="fsafsa"))
fs._root["c"].append(d3)
print(fs)
fs.remove(PurePosixPath("dirt2/dirt1/tgn1"))
print(fs)

View File

@@ -3,6 +3,7 @@ from .storage import Storage
from pathlib import PurePath, PurePosixPath
from ..config import StorageItem
from .vnode import VFileSystem
from typing import BinaryIO, Literal
class LocalStorage(Storage):
def __init__(self, base_path: str) -> None:
@@ -16,34 +17,5 @@ class LocalStorage(Storage):
raise ValueError("config type mismatch!")
return LocalStorage(cfg["path"])
@classmethod
def support_random_access(cls) -> bool:
return True
def listdir(self, path=PurePosixPath("")) -> list[PurePosixPath] | Awaitable[list[PurePosixPath]]:
raise NotImplementedError
def exists(self, path: PurePosixPath) -> bool | Awaitable[bool]:
raise NotImplementedError
def is_dir(self, path: PurePosixPath) -> bool | Awaitable[bool]:
raise NotImplementedError
def is_file(self, path: PurePosixPath) -> bool | Awaitable[bool]:
raise NotImplementedError
def get_source_path(self, path: PurePosixPath) -> PurePath | Awaitable[PurePath]:
raise NotImplementedError
def get_storage_path(self, path: PurePath) -> PurePosixPath | Awaitable[PurePosixPath]:
raise NotImplementedError
def backup_file(self, source: PurePath, target: PurePosixPath) -> None | Awaitable[None]:
raise NotImplementedError
def restore_file(self, source: PurePosixPath, target: PurePath) -> None | Awaitable[None]:
raise NotImplementedError
def remove(self, path: PurePosixPath) -> None | Awaitable[None]:
raise NotImplementedError

View File

@@ -1,8 +1,8 @@
from ..config import StorageItem
from abc import ABC, abstractmethod
from pathlib import PurePosixPath, PurePath
from collections.abc import Awaitable
from inspect import isawaitable
from pathlib import PurePosixPath
from collections.abc import Awaitable, Iterable, AsyncIterable
from typing import BinaryIO, Literal
class Storage(ABC):
@classmethod
@@ -14,9 +14,9 @@ class Storage(ABC):
def from_config(cls, cfg: StorageItem) -> 'Storage':
...
@classmethod
@abstractmethod
def support_random_access(cls) -> bool:
def sync_storage() -> Iterable[tuple[int,int]] | AsyncIterable[tuple[int,int]]:
""" sync storage, yield (current, total) progress """
...
@abstractmethod
@@ -40,26 +40,21 @@ class Storage(ABC):
...
@abstractmethod
def get_source_path(self, path: PurePosixPath) -> PurePath | Awaitable[PurePath]:
""" get the file's source path (the location when it was backuped) """
...
@abstractmethod
def get_storage_path(self, path: PurePath) -> PurePosixPath | Awaitable[PurePosixPath]:
""" get the file's storage path (the location in the storage) """
...
@abstractmethod
def backup_file(self, source: PurePath, target: PurePosixPath) -> None | Awaitable[None]:
""" copy file from local to storage """
...
@abstractmethod
def restore_file(self, source: PurePosixPath, target: PurePath) -> None | Awaitable[None]:
""" copy file from storage to local """
def open(self, path: PurePosixPath, mode: Literal["r", "w"] = "r") -> BinaryIO | Awaitable[BinaryIO]:
""" open file-like object for file operation """
...
@abstractmethod
def remove(self, path: PurePosixPath) -> None | Awaitable[None]:
""" delete file or dir from storage """
""" delete file or dir from storage, dir should be empty """
...
@abstractmethod
def rmtree(self, path: PurePosixPath) -> None | Awaitable[None]:
""" remove dir tree, remove all contents """
...
@abstractmethod
def makedirs(self, path: PurePosixPath, exists_ok=False) -> None | Awaitable[None]:
""" create dir """
...

View File

@@ -1,27 +1,27 @@
from typing import TypedDict, Literal
from pathlib import PurePath, PurePosixPath
from typing import TypedDict, Literal, BinaryIO
from collections.abc import Awaitable
from pathlib import PurePosixPath
from functools import cmp_to_key
from io import BytesIO
class StorageFile(TypedDict):
ty: Literal["file"] # type
s: str # source name
t: str # target name
n: str # virtual file name
mt: int # modify time
sz: int # size
class StorageDir(TypedDict):
ty: Literal["dir"] # type
s: str # source name
t: str # target name
n: str # virtual file name
c: list["StorageItem"] # dir content
StorageItem = StorageFile | StorageDir
def new_storage_dir(s="", t="", c=[]) -> StorageDir:
return StorageDir(ty="dir", s=s, t=t, c=c)
def new_storage_dir(n="", c=[]) -> StorageDir:
return StorageDir(ty="dir", n=n, c=c)
def new_storage_file(s="", t="", mt=0, sz=0) -> StorageFile:
return StorageFile(ty="file", s=s, t=t, mt=mt, sz=sz)
def new_storage_file(n="", mt=0, sz=0) -> StorageFile:
return StorageFile(ty="file", n=n, mt=mt, sz=sz)
class VFSError(RuntimeError):
def __init__(self, *args: object) -> None:
@@ -31,22 +31,26 @@ class FileNotFoundError(VFSError):
def __init__(self, p) -> None:
super().__init__(f"FileNotFound: {p}")
class FileIsNotDirError(VFSError):
class NotADirectoryError(VFSError):
def __init__(self, p) -> None:
super().__init__(f"FileIsNotDir: {p}")
super().__init__(f"NotADirectory: {p}")
class DirectoryNotEmptyError(VFSError):
def __init__(self, p) -> None:
super().__init__(f"DirectoryNotEmpty: {p}")
def find_vnode(root: StorageDir, path: PurePath, search_key: Literal["s", "t"]) -> list[StorageItem]:
def find_vnode(root: StorageDir, path: PurePosixPath) -> list[StorageItem]:
if len(path.parts) > 0:
p = path.parts[0]
for n in root["c"]:
if n[search_key] == p:
if n["n"] == p:
if len(path.parts) == 1:
return [n, ]
if n["ty"] != "dir":
raise FileNotFoundError(p)
next_path = PurePosixPath(*path.parts[1:]) if search_key == "t" else PurePath(*path.parts[1:])
nodes = find_vnode(n, next_path, search_key)
next_path = PurePosixPath(*path.parts[1:])
nodes = find_vnode(n, next_path)
return [n, *nodes]
raise FileNotFoundError(path)
return [root]
@@ -59,9 +63,9 @@ def _cmp_storage_item1(item1: StorageItem, item2: StorageItem):
else:
return 1
# compare name
if item1["t"] > item2["t"]:
if item1["n"] > item2["n"]:
return 1
elif item1["t"] < item2["t"]:
elif item1["n"] < item2["n"]:
return -1
return 0
@@ -73,9 +77,9 @@ def _cmp_storage_item2(item1: StorageItem, item2: StorageItem):
else:
return -1
# compare name
if item1["t"] > item2["t"]:
if item1["n"] > item2["n"]:
return 1
elif item1["t"] < item2["t"]:
elif item1["n"] < item2["n"]:
return -1
return 0
@@ -85,9 +89,9 @@ def sort_storage_items(items: list[StorageItem], dir_first = True):
def dump_storage_item_lines(item: StorageItem, indent=0):
indent_text = " " * indent
if item["ty"] == "file":
return [f"{indent_text}📄{item["t"]}: size {item["sz"]}"]
return [f"{indent_text}📄{item["n"]}: size {item["sz"]}"]
elif item["ty"] == "dir":
lines = [f"{indent_text}📂{item["t"]}:"]
lines = [f"{indent_text}📂{item["n"]}:"]
sort_storage_items(item["c"], False)
for n in item["c"]:
lines.extend(dump_storage_item_lines(n, indent=indent + 1))
@@ -96,35 +100,100 @@ def dump_storage_item_lines(item: StorageItem, indent=0):
class VFileSystem():
def __init__(self) -> None:
self.root = new_storage_dir()
self._root = new_storage_dir()
def __repr__(self):
return "\n".join(dump_storage_item_lines(self._root))
def _on_remove_node(self, node_path: list[StorageItem]) -> None | Awaitable[None]:
parents_path = PurePosixPath( *(n["n"] for n in node_path) )
print("remove:", parents_path)
pass
def _op_open_node(self, node_path: list[StorageItem]) -> BinaryIO | Awaitable[BinaryIO]:
return BytesIO()
def listdir(self, path=PurePosixPath("")) -> list[PurePosixPath]:
nodes = find_vnode(self.root, path, "t")
base_path = PurePosixPath( *(n["t"] for n in nodes) )
nodes = find_vnode(self._root, path)
base_path = PurePosixPath( *(n["n"] for n in nodes) )
item = nodes[-1]
if item["ty"] != "dir":
raise FileIsNotDirError(path)
raise NotADirectoryError(path)
sort_storage_items(item["c"], True)
return [ base_path.joinpath(n["t"]) for n in item["c"]]
return [ base_path.joinpath(n["n"]) for n in item["c"]]
def exists(self, path: PurePosixPath) -> bool:
try:
_ = find_vnode(self.root, path, "t")
_ = find_vnode(self._root, path)
return True
except FileNotFoundError:
return False
def is_dir(self, path: PurePosixPath) -> bool:
raise NotImplementedError
node = find_vnode(self._root, path)[-1]
return node["ty"] == "dir"
def is_file(self, path: PurePosixPath) -> bool:
raise NotImplementedError
def get_source_path(self, path: PurePosixPath) -> PurePath:
raise NotImplementedError
def get_storage_path(self, path: PurePath) -> PurePosixPath:
raise NotImplementedError
node = find_vnode(self._root, path)[-1]
return node["ty"] == "file"
def remove(self, path: PurePosixPath) -> None:
raise NotImplementedError
nodes = find_vnode(self._root, path)
if len(nodes) == 1:
# in the root
parent = self._root
elif len(nodes) >= 2:
parent = nodes[-2]
else:
raise FileNotFoundError(path)
last = nodes[-1]
if last["ty"] == "dir":
if len(last["c"]) > 0:
raise DirectoryNotEmptyError(path)
# remove callback
self._on_remove_node(nodes)
if parent["ty"] == "dir":
parent["c"].remove(last)
def rmtree(self, path: PurePosixPath) -> None:
nodes = find_vnode(self._root, path)
if len(nodes) == 1:
# in the root
parent = self._root
prefix = []
elif len(nodes) >= 2:
parent = nodes[-2]
prefix = nodes[:-1]
else:
raise FileNotFoundError(path)
last = nodes[-1]
vstack = [last]
while True:
last = vstack[-1]
# process dir
if last["ty"] == "dir":
if len(last["c"]) > 0:
# remove child at the last loop
vstack.append(last["c"][0])
parent = last
continue
# remove callback
node_path = list(prefix)
node_path.extend(vstack)
self._on_remove_node(node_path)
# remove self from parent
if parent["ty"] == "dir":
parent["c"].remove(last)
vstack.pop()
if len(vstack) >= 2:
parent = vstack[-2]
elif len(vstack) == 1:
parent = prefix[-1] if len(prefix) >= 1 else self._root
else:
pass
else:
raise NotADirectoryError(parent["n"])
# check if anything left
if len(vstack) <= 0:
break