事件鉤子
Hermes 提供了兩個鉤子系統,可在關鍵生命週期節點運行自定義代碼:
| 系統 | 註冊方式 | 運行環境 | 使用場景 |
|---|---|---|---|
| 網關鉤子 | HOOK.yaml + handler.py 存放在 ~/.hermes/hooks/ 目錄下 | 僅網關 | 日誌記錄、告警、Webhook |
| 插件鉤子 | 在 插件 中通過 ctx.register_hook() 註冊 | CLI + 網關 | 工具攔截、指標收集、安全策略 |
兩個系統均為非阻塞模式 —— 任何鉤子中的錯誤都會被捕獲並記錄,絕不會導致 Agent 崩潰。
網關事件鉤子
網關鉤子會在網關運行期間(Telegram、Discord、Slack、WhatsApp)自動觸發,且不會阻塞主 Agent 流程。
創建鉤子
每個鉤子是一個位於 ~/.hermes/hooks/ 下的目錄,包含兩個文件:
~/.hermes/hooks/
└── my-hook/
├── HOOK.yaml # 聲明要監聽哪些事件
└── handler.py # Python 處理函數
HOOK.yaml
name: my-hook
description: Log all agent activity to a file
events:
- agent:start
- agent:end
- agent:step
events 列表決定了哪些事件會觸發你的處理器。你可以訂閱任意組合的事件,包括通配符如 command:*。
handler.py
import json
from datetime import datetime
from pathlib import Path
LOG_FILE = Path.home() / ".hermes" / "hooks" / "my-hook" / "activity.log"
async def handle(event_type: str, context: dict):
"""Called for each subscribed event. Must be named 'handle'."""
entry = {
"timestamp": datetime.now().isoformat(),
"event": event_type,
**context,
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
處理器規則:
- 必須命名為
handle - 接收
event_type(字符串)和context(字典) - 可以是
async def或普通def—— 兩者均有效 - 錯誤會被捕獲並記錄,絕不會導致 Agent 崩潰
可用事件
| 事件 | 觸發時機 | 上下文鍵 |
|---|---|---|
gateway:startup | 網關進程啟動時 | platforms(當前激活的平臺名稱列表) |
session:start | 新的消息會話創建時 | platform、user_id、session_id、session_key |
session:end | 會話結束(重置前) | platform、user_id、session_key |
session:reset | 用戶執行 /new 或 /reset 時 | platform、user_id、session_key |
agent:start | Agent 開始處理消息時 | platform、user_id、session_id、message |
agent:step | 每次工具調用循環的迭代 | platform、user_id、session_id、iteration、tool_names |
agent:end | Agent 完成處理時 | platform、user_id、session_id、message、response |
command:* | 任意斜槓命令執行時 | platform、user_id、command、args |
通配符匹配
註冊為 command:* 的處理器將對所有 command: 事件(如 command:model、command:reset 等)觸發。通過單一訂閱即可監控所有斜槓命令。
示例
啟動檢查清單(BOOT.md)—— 內置功能
網關自帶一個內置的 boot-md 鉤子,會在每次啟動時檢查 ~/.hermes/BOOT.md 文件是否存在。如果文件存在,Agent 將在後臺會話中執行其指令。無需安裝 —— 只需創建該文件即可。
創建 ~/.hermes/BOOT.md:
# 啟動清單
1. Check if any cron jobs failed overnight — run `hermes cron list`
2. Send a message to Discord #general saying "Gateway restarted, all systems go"
3. Check if /opt/app/deploy.log has any errors from the last 24 hours
Agent 會在後臺線程中運行這些指令,因此不會阻塞網關啟動。如果無需處理任何事項,Agent 將回復 [SILENT],且不會發送任何消息。
沒有 BOOT.md?該鉤子會靜默跳過 —— 無任何開銷。需要啟動自動化時創建文件,不需要時刪除即可。
長任務時發送 Telegram 告警
當 Agent 執行超過 10 步時,向自己發送一條消息:
# ~/.hermes/hooks/long-task-alert/HOOK.yaml
name: long-task-alert
description: Alert when agent is taking many steps
events:
- agent:step
# ~/.hermes/hooks/long-task-alert/handler.py
import os
import httpx
THRESHOLD = 10
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_HOME_CHANNEL")
async def handle(event_type: str, context: dict):
iteration = context.get("iteration", 0)
if iteration == THRESHOLD and BOT_TOKEN and CHAT_ID:
tools = ", ".join(context.get("tool_names", []))
text = f"⚠️ Agent has been running for {iteration} steps. Last tools: {tools}"
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={"chat_id": CHAT_ID, "text": text},
)
命令使用日誌記錄器
記錄哪些斜槓命令被使用:
# ~/.hermes/hooks/command-logger/HOOK.yaml
name: command-logger
description: Log slash command usage
events:
- command:*
# ~/.hermes/hooks/command-logger/handler.py
import json
from datetime import datetime
from pathlib import Path
LOG = Path.home() / ".hermes" / "logs" / "command_usage.jsonl"
def handle(event_type: str, context: dict):
LOG.parent.mkdir(parents=True, exist_ok=True)
entry = {
"ts": datetime.now().isoformat(),
"command": context.get("command"),
"args": context.get("args"),
"platform": context.get("platform"),
"user": context.get("user_id"),
}
with open(LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
會話開始時觸發 Webhook
在新會話創建時向外部服務發送 POST 請求:
# ~/.hermes/hooks/session-webhook/HOOK.yaml
name: session-webhook
description: Notify external service on new sessions
events:
- session:start
- session:reset
# ~/.hermes/hooks/session-webhook/handler.py
import httpx
WEBHOOK_URL = "https://your-service.example.com/hermes-events"
async def handle(event_type: str, context: dict):
async with httpx.AsyncClient() as client:
await client.post(WEBHOOK_URL, json={
"event": event_type,
**context,
}, timeout=5)
工作原理
- 網關啟動時,
HookRegistry.discover_and_load()掃描~/.hermes/hooks/目錄 - 每個包含
HOOK.yaml和handler.py的子目錄會被動態加載 - 處理器根據聲明的事件進行註冊
- 在每個生命週期節點,
hooks.emit()會觸發所有匹配的處理器 - 任何處理器中的錯誤都會被捕獲並記錄 —— 一個損壞的鉤子絕不會導致 Agent 崩潰
網關鉤子僅在 網關(Telegram、Discord、Slack、WhatsApp)中觸發。CLI 不會加載網關鉤子。如需在所有環境中工作的鉤子,請使用 插件鉤子。
插件鉤子
插件 可通過在插件的 register() 函數中調用 ctx.register_hook() 來註冊鉤子,這些鉤子會在 CLI 和網關 會話中均觸發。
def register(ctx):
ctx.register_hook("pre_tool_call", my_tool_observer)
ctx.register_hook("post_tool_call", my_tool_logger)
ctx.register_hook("pre_llm_call", my_memory_callback)
ctx.register_hook("post_llm_call", my_sync_callback)
ctx.register_hook("on_session_start", my_init_callback)
ctx.register_hook("on_session_end", my_cleanup_callback)
所有鉤子的通用規則:
- 回調函數接收 關鍵字參數。始終接受
**kwargs以保證向前兼容 —— 未來版本可能會添加新參數,但不會破壞你的插件。 - 如果回調函數 崩潰,它會被記錄並跳過。其他鉤子和 Agent 將繼續正常運行。行為異常的插件絕不會導致 Agent 崩潰。
- 所有鉤子均為 一次性觀察者,其返回值被忽略 —— 除了
pre_llm_call,它可以 注入上下文。
快速參考
| 鉤子 | 觸發時機 | 返回值 |
|---|---|---|
pre_tool_call | 在任何工具執行前 | 忽略 |
post_tool_call | 在任何工具返回後 | 忽略 |
pre_llm_call | 每輪對話開始時,工具調用循環之前 | 上下文注入 |
post_llm_call | 每輪對話結束時,工具調用循環之後 | 忽略 |
on_session_start | 新會話創建時(僅第一輪) | 忽略 |
on_session_end | 會話結束時 | 忽略 |
pre_tool_call
在每次工具執行之前立即觸發——包括內置工具和插件工具。
回調簽名:
def my_callback(tool_name: str, args: dict, task_id: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
tool_name | str | 即將執行的工具名稱(例如 "terminal"、"web_search"、"read_file") |
args | dict | 模型傳遞給該工具的參數 |
task_id | str | 會話/任務標識符。未設置時為空字符串。 |
觸發位置: 在 model_tools.py 中的 handle_function_call() 內部,在工具處理器運行之前。每調用一次工具即觸發一次——如果模型並行調用 3 個工具,則此鉤子將觸發 3 次。
返回值: 忽略。
使用場景: 日誌記錄、審計追蹤、工具調用計數、阻止危險操作(打印警告)、速率限制。
示例 —— 工具調用審計日誌:
import json, logging
from datetime import datetime
logger = logging.getLogger(__name__)
def audit_tool_call(tool_name, args, task_id, **kwargs):
logger.info("TOOL_CALL session=%s tool=%s args=%s",
task_id, tool_name, json.dumps(args)[:200])
def register(ctx):
ctx.register_hook("pre_tool_call", audit_tool_call)
示例 —— 對危險工具發出警告:
DANGEROUS = {"terminal", "write_file", "patch"}
def warn_dangerous(tool_name, **kwargs):
if tool_name in DANGEROUS:
print(f"⚠ Executing potentially dangerous tool: {tool_name}")
def register(ctx):
ctx.register_hook("pre_tool_call", warn_dangerous)
post_tool_call
在每次工具執行返回後立即觸發。
回調簽名:
def my_callback(tool_name: str, args: dict, result: str, task_id: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
tool_name | str | 剛剛執行完畢的工具名稱 |
args | dict | 模型傳遞給該工具的參數 |
result | str | 工具的返回值(始終為 JSON 字符串) |
task_id | str | 會話/任務標識符。未設置時為空字符串。 |
觸發位置: 在 model_tools.py 中的 handle_function_call() 內部,在工具處理器返回之後。每調用一次工具即觸發一次。如果工具拋出未處理的異常(異常被捕獲並以錯誤 JSON 字符串形式返回),則 post_tool_call 仍會觸發,且 result 參數為該錯誤字符串。
返回值: 忽略。
使用場景: 記錄工具結果、指標收集、跟蹤工具成功率/失敗率、特定工具完成時發送通知。
示例 —— 跟蹤工具使用指標:
from collections import Counter
import json
_tool_counts = Counter()
_error_counts = Counter()
def track_metrics(tool_name, result, **kwargs):
_tool_counts[tool_name] += 1
try:
parsed = json.loads(result)
if "error" in parsed:
_error_counts[tool_name] += 1
except (json.JSONDecodeError, TypeError):
pass
def register(ctx):
ctx.register_hook("post_tool_call", track_metrics)
pre_llm_call
每輪對話僅觸發一次,在工具調用循環開始前觸發。這是唯一一個返回值會被使用的鉤子——它可以將上下文注入到當前輪次的用戶消息中。
回調簽名:
def my_callback(session_id: str, user_message: str, conversation_history: list,
is_first_turn: bool, model: str, platform: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
session_id | str | 當前會話的唯一標識符 |
user_message | str | 當前輪次用戶原始消息(在任何技能注入前) |
conversation_history | list | 完整消息列表的副本(OpenAI 格式:[{"role": "user", "content": "..."}]) |
is_first_turn | bool | 如果是新會話的第一輪,則為 True;後續輪次為 False |
model | str | 模型標識符(例如 "anthropic/claude-sonnet-4.6") |
platform | str | 會話運行的平臺:"cli"、"telegram"、"discord" 等。 |
觸發位置: 在 run_agent.py 中的 run_conversation() 內部,上下文壓縮之後、主 while 循環之前。每調用一次 run_conversation() 即觸發一次(即每輪用戶輸入觸發一次),而非在工具循環內的每次 API 調用時觸發。
返回值: 如果回調返回一個包含 "context" 鍵的字典,或一個非空字符串,則該文本將被追加到當前輪次的用戶消息末尾。返回 None 表示不注入上下文。
# 注入context
return {"context": "Recalled memories:\n- User likes Python\n- Working on hermes-agent"}
# 純字符串(等效)
return "Recalled memories:\n- User likes Python"
# 無需注射
return None
上下文注入位置: 始終注入到用戶消息中,從不注入到系統提示中。這保留了提示緩存——系統提示在各輪之間保持一致,因此緩存的 token 可被複用。系統提示屬於 Hermes 的範疇(模型引導、工具強制、個性、技能)。插件則在用戶輸入旁貢獻上下文。
所有注入的上下文均為臨時性——僅在 API 調用時添加。對話歷史中的原始用戶消息不會被修改,且不會持久化到會話數據庫中。
當多個插件返回上下文時,它們的輸出將按插件發現順序(按目錄名字母順序)用雙換行符連接。
使用場景: 記憶召回、RAG 上下文注入、安全護欄、每輪分析。
示例 —— 記憶召回:
import httpx
MEMORY_API = "https://your-memory-api.example.com"
def recall(session_id, user_message, is_first_turn, **kwargs):
try:
resp = httpx.post(f"{MEMORY_API}/recall", json={
"session_id": session_id,
"query": user_message,
}, timeout=3)
memories = resp.json().get("results", [])
if not memories:
return None
text = "Recalled context:\n" + "\n".join(f"- {m['text']}" for m in memories)
return {"context": text}
except Exception:
return None
def register(ctx):
ctx.register_hook("pre_llm_call", recall)
示例 —— 安全護欄:
POLICY = "Never execute commands that delete files without explicit user confirmation."
def guardrails(**kwargs):
return {"context": POLICY}
def register(ctx):
ctx.register_hook("pre_llm_call", guardrails)
post_llm_call
每輪對話僅觸發一次,在工具調用循環完成後,Agent 生成最終響應時觸發。僅在成功完成的輪次中觸發——若該輪被中斷,則不會觸發。
回調簽名:
def my_callback(session_id: str, user_message: str, assistant_response: str,
conversation_history: list, model: str, platform: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
session_id | str | 當前會話的唯一標識符 |
user_message | str | 當前輪次中用戶的原始消息 |
assistant_response | str | Agent 在當前輪次的最終文本響應 |
conversation_history | list | 當輪次完成後完整的消息列表副本 |
model | str | 模型標識符 |
platform | str | 會話運行的平臺 |
觸發時機: 在 run_agent.py 中的 run_conversation() 函數內,工具循環退出並生成最終響應後觸發。受 if final_response and not interrupted 保護 —— 因此當用戶在輪次中途中斷或 Agent 達到迭代限制但未生成響應時,不會觸發。
返回值: 忽略。
使用場景: 將對話數據同步到外部記憶系統、計算響應質量指標、記錄輪次摘要、觸發後續操作。
示例 —— 同步到外部記憶系統:
import httpx
MEMORY_API = "https://your-memory-api.example.com"
def sync_memory(session_id, user_message, assistant_response, **kwargs):
try:
httpx.post(f"{MEMORY_API}/store", json={
"session_id": session_id,
"user": user_message,
"assistant": assistant_response,
}, timeout=5)
except Exception:
pass # 盡力而為
def register(ctx):
ctx.register_hook("post_llm_call", sync_memory)
示例 —— 跟蹤響應長度:
import logging
logger = logging.getLogger(__name__)
def log_response_length(session_id, assistant_response, model, **kwargs):
logger.info("RESPONSE session=%s model=%s chars=%d",
session_id, model, len(assistant_response or ""))
def register(ctx):
ctx.register_hook("post_llm_call", log_response_length)
on_session_start
在創建全新會話時僅觸發一次。在會話續接時(用戶發送第二條消息到現有會話)不會觸發。
回調簽名:
def my_callback(session_id: str, model: str, platform: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
session_id | str | 新會話的唯一標識符 |
model | str | 模型標識符 |
platform | str | 會話運行的平臺 |
觸發時機: 在 run_agent.py 中的 run_conversation() 函數內,新會話的第一輪中觸發 —— 具體是在系統提示構建完成後、工具循環開始前。判斷條件為 if not conversation_history(無先前消息 = 新會話)。
返回值: 忽略。
使用場景: 初始化會話範圍的狀態、預熱緩存、向外部服務註冊會話、記錄會話啟動日誌。
示例 —— 初始化會話緩存:
_session_caches = {}
def init_session(session_id, model, platform, **kwargs):
_session_caches[session_id] = {
"model": model,
"platform": platform,
"tool_calls": 0,
"started": __import__("datetime").datetime.now().isoformat(),
}
def register(ctx):
ctx.register_hook("on_session_start", init_session)
on_session_end
在每次 run_conversation() 調用的最末尾觸發,無論結果如何。如果用戶在 Agent 處理過程中退出(如按 Ctrl+C 或輸入 /exit),也會從 CLI 的退出處理器中觸發。
回調簽名:
def my_callback(session_id: str, completed: bool, interrupted: bool,
model: str, platform: str, **kwargs):
| 參數 | 類型 | 描述 |
|---|---|---|
session_id | str | 會話的唯一標識符 |
completed | bool | 如果 Agent 生成了最終響應則為 True,否則為 False |
interrupted | bool | 如果本輪被中斷(用戶發送新消息、輸入 /stop 或退出)則為 True |
model | str | 模型標識符 |
platform | str | 會話運行的平臺 |
觸發位置:
run_agent.py—— 每次run_conversation()調用結束後,所有清理操作完成後。無論本輪是否出錯,都會觸發。cli.py—— 在 CLI 的 atexit 處理器中觸發,但僅當Agent 處於處理中狀態(_agent_running=True)時才觸發。這會捕獲在處理過程中按 Ctrl+C 或輸入/exit的情況。此時completed=False且interrupted=True。
返回值: 忽略。
使用場景: 刷新緩衝區、關閉連接、持久化會話狀態、記錄會話時長、清理在 on_session_start 中初始化的資源。
示例 —— 刷新並清理:
_session_caches = {}
def cleanup_session(session_id, completed, interrupted, **kwargs):
cache = _session_caches.pop(session_id, None)
if cache:
# 將累積數據刷新到磁盤或外部服務
status = "completed" if completed else ("interrupted" if interrupted else "failed")
print(f"Session {session_id} ended: {status}, {cache['tool_calls']} tool calls")
def register(ctx):
ctx.register_hook("on_session_end", cleanup_session)
示例 —— 會話時長跟蹤:
import time, logging
logger = logging.getLogger(__name__)
_start_times = {}
def on_start(session_id, **kwargs):
_start_times[session_id] = time.time()
def on_end(session_id, completed, interrupted, **kwargs):
start = _start_times.pop(session_id, None)
if start:
duration = time.time() - start
logger.info("SESSION_DURATION session=%s seconds=%.1f completed=%s interrupted=%s",
session_id, duration, completed, interrupted)
def register(ctx):
ctx.register_hook("on_session_start", on_start)
ctx.register_hook("on_session_end", on_end)
有關完整指南,請參閱 構建插件指南,其中包含工具模式、處理器和高級鉤子模式的詳細說明。