AIOps在DevOps日常工作中如何落地?这里分享3-Stage Pipeline(三段式:感知 -> 单点大模型决策 -> 协同与双写记忆)架构,一套高度集成、贴近生产环境的核心代码。
为了可以直接 Copy 并以最小成本在本地或测试环境中跑通,这里使用 FastAPI 作为主干,内嵌了SQLite (代表关系型数据库) 和 Qdrant 的本地持久化模式 (代表向量数据库)。
一、 生产环境依赖准备
在运行代码前,请确保安装了以下核心依赖(可以将其保存为 requirements.txt):
fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
pydantic==2.5.2
qdrant-client==1.7.0
sentence-transformers==2.2.2
langchain==0.0.350
langchain-google-genai==0.0.5
执行安装:pip install -r requirements.txt
二、 核心代码:main.py
这是整个 AIOps 闭环的核心程序。它包含了 Webhook 接收、历史记忆检索、大模型结构化分析(输出标准 JSON)、以及人工审批入库的完整逻辑。
import os
import json
import threading
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel, Field
import uvicorn
# 数据库相关
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
# 向量检索相关
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
# AI/LLM 相关
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
# ==========================================
# 0. 全局配置与初始化
# ==========================================
os.environ["GOOGLE_API_KEY"] = "YOUR_GEMINI_API_KEY_HERE" # 请替换为您的 Key
# --- 1. 关系型数据库 (SQLite 用于演示,生产替换为 PostgreSQL/MySQL) ---
SQLALCHEMY_DATABASE_URL = "sqlite:///./aiops_production.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class IncidentRecord(Base):
"""故障工单状态表"""
__tablename__ = "incidents"
id = Column(Integer, primary_key=True, index=True)
alert_name = Column(String, index=True)
raw_alert = Column(Text)
ai_rca = Column(Text) # AI 根因分析
ai_action_plan = Column(Text) # AI 修复方案
status = Column(String, default="PENDING") # PENDING, APPROVED, REJECTED
created_at = Column(DateTime, default=datetime.utcnow)
approved_by = Column(String, nullable=True)
Base.metadata.create_all(bind=engine)
# --- 2. 向量数据库 (Qdrant 本地文件模式,生产替换为 Server 地址) ---
QDRANT_COLLECTION = "aiops_knowledge"
qdrant_client = QdrantClient(path="./qdrant_storage")
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# 初始化 Qdrant 集合
if not qdrant_client.record_exists(collection_name=QDRANT_COLLECTION, ids=[1]):
try:
qdrant_client.create_collection(
collection_name=QDRANT_COLLECTION,
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)
except Exception:
pass
# --- 3. 大模型与结构化输出配置 ---
llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", temperature=0.1)
class AIAnalysisResult(BaseModel):
rca: str = Field(description="根本原因分析 (Root Cause Analysis),解释为什么会产生此告警。")
action_plan: str = Field(description="分步的故障修复排查方案 (Action Plan),包含具体的 CLI 命令。")
parser = PydanticOutputParser(pydantic_object=AIAnalysisResult)
prompt_template = PromptTemplate(
template="""你是一个资深的云原生 AIOps 专家。请根据以下信息进行故障诊断。
【当前 Prometheus 告警】:
{alert_data}
【知识库中历史相似故障经验】:
{history_context}
请严格按照以下 JSON 格式输出你的分析结果,不要包含任何额外的 markdown 标记或废话:
{format_instructions}
""",
input_variables=["alert_data", "history_context"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
# ==========================================
# 1. 核心 AI 处理流 (后台任务)
# ==========================================
def process_alert_pipeline(alert: dict):
"""阶段一 & 阶段二:检索上下文并进行大模型决策"""
db = SessionLocal()
try:
alert_name = alert.get("labels", {}).get("alertname", "UnknownAlert")
alert_desc = alert.get("annotations", {}).get("description", str(alert))
print(f"[*] 开始处理告警: {alert_name}")
# 1. 向量检索历史记忆
vector = embedding_model.encode(alert_desc).tolist()
search_results = qdrant_client.search(
collection_name=QDRANT_COLLECTION,
query_vector=vector,
limit=2,
score_threshold=0.7 # 只取相关度高的历史
)
history_context = "\n".join([hit.payload['content'] for hit in search_results]) if search_results else "无历史类似告警记录。"
# 2. 大模型推理决策
print(f"[*] 正在请求大模型进行 RCA 和方案生成...")
_input = prompt_template.format(alert_data=json.dumps(alert, ensure_ascii=False), history_context=history_context)
response = llm.predict(_input)
# 3. 解析结构化输出
ai_result: AIAnalysisResult = parser.parse(response)
# 4. 阶段三前置:存入关系型数据库,等待人类审批
new_record = IncidentRecord(
alert_name=alert_name,
raw_alert=json.dumps(alert, ensure_ascii=False),
ai_rca=ai_result.rca,
ai_action_plan=ai_result.action_plan
)
db.add(new_record)
db.commit()
print(f"[+] AI 分析完成,已存入待审批看板 (工单ID: {new_record.id})")
# TODO (生产环境): 此处可调用飞书/钉钉 API 推送消息:“发现新告警及 AI 修复方案,请前往看板审批”
except Exception as e:
print(f"[-] AI 处理流水线异常: {str(e)}")
finally:
db.close()
def push_to_qdrant(knowledge_text: str):
"""阶段三后置:将审批通过的高质量经验沉淀为向量记忆"""
import uuid
try:
vector = embedding_model.encode(knowledge_text).tolist()
point_id = str(uuid.uuid4())
qdrant_client.upsert(
collection_name=QDRANT_COLLECTION,
points=[PointStruct(id=point_id, vector=vector, payload={"content": knowledge_text})]
)
print(f"[+] 知识闭环完成!高质量经验已存入 Qdrant 向量库。")
except Exception as e:
print(f"[-] 向量化存储失败: {str(e)}")
# ==========================================
# 2. FastAPI Web 服务路由
# ==========================================
app = FastAPI(title="AIOps Production API")
@app.post("/webhook/alerts")
async def prometheus_webhook(request: Request, background_tasks: BackgroundTasks):
"""Prometheus Alertmanager 触发入口"""
payload = await request.json()
alerts = payload.get("alerts", [])
for alert in alerts:
if alert.get("status") == "firing":
# 将每个告警的处理丢进后台线程,确保 Webhook 接口毫秒级返回
background_tasks.add_task(process_alert_pipeline, alert)
return {"status": "success", "message": f"Received {len(alerts)} alerts. AI processing started in background."}
@app.get("/api/incidents/pending")
def get_pending_incidents():
"""获取看板中待审批的 AI 诊断工单"""
db = SessionLocal()
records = db.query(IncidentRecord).filter(IncidentRecord.status == "PENDING").all()
db.close()
return records
class ApprovalRequest(BaseModel):
record_id: int
approver_name: str
edited_action_plan: str = None # 如果 AI 的命令有瑕疵,人类修改后的最终版
@app.post("/api/incidents/approve")
def approve_incident(req: ApprovalRequest, background_tasks: BackgroundTasks):
"""看板审批接口:人类审核通过,触发双写闭环"""
db = SessionLocal()
record = db.query(IncidentRecord).filter(IncidentRecord.id == req.record_id).first()
if not record:
db.close()
raise HTTPException(status_code=404, detail="工单未找到")
if record.status != "PENDING":
db.close()
raise HTTPException(status_code=400, detail="该工单已被处理")
# 1. 更新关系库状态
record.status = "APPROVED"
record.approved_by = req.approver_name
final_plan = req.edited_action_plan if req.edited_action_plan else record.ai_action_plan
record.ai_action_plan = final_plan
db.commit()
db.close()
# 2. 异步将最终标准 SOP 写入向量库
knowledge_text = f"【告警特征】:\n{record.raw_alert}\n\n【根因分析】:\n{record.ai_rca}\n\n【标准修复SOP】:\n{final_plan}"
background_tasks.add_task(push_to_qdrant, knowledge_text)
return {"status": "success", "message": f"工单 {req.record_id} 审批通过,知识已沉淀。"}
if __name__ == "__main__":
print("🚀 AIOps Backend Server is running...")
uvicorn.run(app, host="0.0.0.0", port=8000)
三、 如何在本地测试这套完整流程?
在终端启动您的服务:
python main.py
步骤 1:模拟 Prometheus 发送告警 Webhook 打开一个新的终端,使用 curl 向系统发送一个模拟的 OOM (内存溢出) 告警:
curl -X POST http://127.0.0.1:8000/webhook/alerts \
-H "Content-Type: application/json" \
-d '{
"alerts": [
{
"status": "firing",
"labels": {
"alertname": "PodOOMKilled",
"pod": "payment-service-v1-abcde",
"namespace": "production"
},
"annotations": {
"description": "Pod payment-service-v1-abcde in namespace production has been OOMKilled."
}
}
]
}'
此时观察服务端的控制台,您会看到 AI 被唤醒,开始进行根因分析,并将结果写入了 SQLite。
步骤 2:模拟前端看板拉取待审批列表
curl http://127.0.0.1:8000/api/incidents/pending
您会拿到一段 JSON 响应,里面包含了 AI 分析出的 ai_rca 和 ai_action_plan,并注意记录下该工单的 id(例如 1)。
步骤 3:模拟运维人员审批通过 发现 AI 给的排查命令很准确,直接审批通过:
curl -X POST http://127.0.0.1:8000/api/incidents/approve \
-H "Content-Type: application/json" \
-d '{
"record_id": 1,
"approver_name": "ZhangSan"
}'
此时观察服务端控制台,您会看到一条日志:[+] 知识闭环完成!高质量经验已存入 Qdrant 向量库。
至此,一个从触发感知到 AI 推理,再到人工协同闭环的 AIOps 生产级骨架就已经完全跑通了。如果要接入生产环境,需要把 SQLite 替换为公司的 PostgreSQL,把 Qdrant 配置为独立的服务地址。
琼杰笔记



评论前必须登录!
注册