模块 nonebot_plugin_marshoai.observer
此模块用于注册观察者函数,使用watchdog监控文件变化并重启bot 启用该模块需要在配置文件中设置dev_mode
为True
var CALLBACK_FUNC
说明: 位置1为FileSystemEvent
类型:
TypeAlias
默认值:
Callable[[FileSystemEvent], None]
var FILTER_FUNC
说明: 位置1为FileSystemEvent
类型:
TypeAlias
默认值:
Callable[[FileSystemEvent], bool]
func debounce(wait)
说明: 防抖函数
源代码 或 在GitHub上查看
python
def debounce(wait):
def decorator(func):
def wrapper(*args, **kwargs):
nonlocal last_call_time
current_time = time.time()
if current_time - last_call_time > wait:
last_call_time = current_time
return func(*args, **kwargs)
last_call_time = None
return wrapper
return decorator
@driver.on_startup
async func check_for_reloader()
源代码 或 在GitHub上查看
python
@driver.on_startup
async def check_for_reloader():
if config.marshoai_devmode:
logger.debug('Marsho Reload enabled, watching for file changes...')
observer.start()
class CodeModifiedHandler(FileSystemEventHandler)
@debounce(1)
func on_modified(self, event)
源代码 或 在GitHub上查看
python
@debounce(1)
def on_modified(self, event):
raise NotImplementedError('on_modified must be implemented')
func on_created(self, event)
源代码 或 在GitHub上查看
python
def on_created(self, event):
self.on_modified(event)
func on_deleted(self, event)
源代码 或 在GitHub上查看
python
def on_deleted(self, event):
self.on_modified(event)
func on_moved(self, event)
源代码 或 在GitHub上查看
python
def on_moved(self, event):
self.on_modified(event)
func on_any_event(self, event)
源代码 或 在GitHub上查看
python
def on_any_event(self, event):
self.on_modified(event)
func on_file_system_event(directories: tuple[str, ...], recursive: bool = True, event_filter: FILTER_FUNC | None = None) -> Callable[[CALLBACK_FUNC], CALLBACK_FUNC]
说明: 注册文件系统变化监听器
参数:
- directories: 监听目录们
- recursive: 是否递归监听子目录
- event_filter: 事件过滤器, 返回True则执行回调函数
返回: 装饰器,装饰一个函数在接收到数据后执行
源代码 或 在GitHub上查看
python
def on_file_system_event(directories: tuple[str, ...], recursive: bool=True, event_filter: FILTER_FUNC | None=None) -> Callable[[CALLBACK_FUNC], CALLBACK_FUNC]:
def decorator(func: CALLBACK_FUNC) -> CALLBACK_FUNC:
def wrapper(event: FileSystemEvent):
if event_filter is not None and (not event_filter(event)):
return
func(event)
code_modified_handler = CodeModifiedHandler()
code_modified_handler.on_modified = wrapper
for directory in directories:
observer.schedule(code_modified_handler, directory, recursive=recursive)
return func
return decorator