📊 WxBot Enhanced 监控集成指南¶
 **深度集成WxBot Enhanced监控系统,实现插件级监控**
📋 目录导航¶
🎯 监控系统概述¶
WxBot Enhanced监控架构¶
WxBot Enhanced内置了完整的监控系统,提供:
- 📊 实时指标收集 - 系统性能、业务指标、插件执行数据
- 🚨 多级告警机制 - Critical、High、Medium、Low四级告警
- 📈 Web可视化仪表板 - 实时数据展示和历史趋势分析
- 🔍 插件级细粒度监控 - 每个插件的执行时间、错误率、调用频次
- 📱 响应式界面 - 支持PC和移动端访问
监控数据类型¶
| 监控维度 | 数据类型 | 说明 |
|---|---|---|
| 系统指标 | CPU、内存、协程 | 运行时系统资源使用情况 |
| 业务指标 | 消息量、错误率、处理速度 | 核心业务运行状态 |
| 插件指标 | 执行时间、错误计数、调用频次 | 每个插件的详细性能数据 |
| 框架指标 | 事件缓冲区、匹配器数量、连接状态 | 框架层运行状态 |
📈 插件监控集成¶
基础监控集成¶
package myplugin
import (
"time"
"github.com/ruk1ng001/wxbot/engine/control"
"github.com/ruk1ng001/wxbot/engine/robot"
"github.com/ruk1ng001/wxbot/engine/pkg/monitor"
)
func init() {
engine := control.Register("myplugin", &control.Options{
Alias: "我的插件",
DataFolder: "myplugin",
})
// 注册插件名称映射(重要!)
robot.RegisterEnginePluginName(engine, "myplugin")
// 注册处理器
engine.OnPrefix("test").Handle(monitoredHandler)
}
// 自动监控的处理器
func monitoredHandler(ctx *robot.Ctx) {
start := time.Now()
var hasError bool
defer func() {
// 自动记录插件执行指标
duration := time.Since(start)
pluginName := robot.GetEnginePluginName(ctx.GetMatcher().Engine)
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
// 记录消息处理
monitor.GlobalMetrics.RecordMessage()
// 恢复panic并记录错误
if r := recover(); r != nil {
hasError = true
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
log.Errorf("[%s] 插件执行异常: %v", pluginName, r)
}
}()
// 业务逻辑
result, err := processMessage(ctx.MessageString())
if err != nil {
hasError = true
ctx.ReplyText("处理失败: " + err.Error())
return
}
ctx.ReplyText(result)
}
高级监控集成¶
// 监控装饰器
func WithMonitoring(pluginName string, handler func(*robot.Ctx)) func(*robot.Ctx) {
return func(ctx *robot.Ctx) {
start := time.Now()
var hasError bool
defer func() {
duration := time.Since(start)
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
if r := recover(); r != nil {
hasError = true
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
log.Errorf("[%s] 插件异常: %v", pluginName, r)
ctx.ReplyText("系统异常,请稍后重试")
}
}()
handler(ctx)
}
}
// 使用监控装饰器
func init() {
engine := control.Register("myplugin", &control.Options{
Alias: "我的插件",
})
// 所有处理器都自动监控
engine.OnPrefix("cmd1").Handle(WithMonitoring("myplugin", cmd1Handler))
engine.OnPrefix("cmd2").Handle(WithMonitoring("myplugin", cmd2Handler))
engine.OnPrefix("cmd3").Handle(WithMonitoring("myplugin", cmd3Handler))
}
批量监控注册¶
// 批量监控管理器
type PluginMonitorManager struct {
pluginName string
metrics *monitor.MetricsCollector
}
func NewPluginMonitorManager(pluginName string) *PluginMonitorManager {
return &PluginMonitorManager{
pluginName: pluginName,
metrics: monitor.GlobalMetrics,
}
}
// 注册监控处理器
func (pmm *PluginMonitorManager) RegisterHandler(engine *control.Engine, trigger string, handler func(*robot.Ctx)) {
monitoredHandler := func(ctx *robot.Ctx) {
start := time.Now()
var hasError bool
defer func() {
duration := time.Since(start)
pmm.metrics.RecordPluginExecution(pmm.pluginName, duration, hasError)
if r := recover(); r != nil {
hasError = true
pmm.metrics.IncrementErrorCount(pmm.pluginName)
log.Errorf("[%s] 处理器异常: %v", pmm.pluginName, r)
}
}()
handler(ctx)
}
engine.OnPrefix(trigger).Handle(monitoredHandler)
}
// 使用示例
func init() {
engine := control.Register("myplugin", &control.Options{Alias: "我的插件"})
robot.RegisterEnginePluginName(engine, "myplugin")
monitor := NewPluginMonitorManager("myplugin")
// 批量注册
monitor.RegisterHandler(engine, "cmd1", cmd1Handler)
monitor.RegisterHandler(engine, "cmd2", cmd2Handler)
monitor.RegisterHandler(engine, "cmd3", cmd3Handler)
}
🔍 自定义指标¶
业务指标统计¶
// 业务指标记录
func recordBusinessMetrics(ctx *robot.Ctx, operation string, success bool) {
pluginName := robot.GetEnginePluginName(ctx.GetMatcher().Engine)
// 记录操作次数
monitor.GlobalMetrics.RecordMessage()
// 记录操作结果
if success {
// 可以扩展成功指标
log.Infof("[%s] 操作成功: %s", pluginName, operation)
} else {
// 记录失败
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
log.Warnf("[%s] 操作失败: %s", pluginName, operation)
}
// 更新连接状态(如果相关)
monitor.GlobalMetrics.UpdateConnectionStatus("connected")
}
// 游戏插件专用指标
func recordGameMetrics(pluginName string, gameResult GameResult) {
// 记录游戏次数
monitor.GlobalMetrics.RecordMessage()
// 记录游戏时长
duration := gameResult.Duration
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, false)
// 记录游戏结果(可以扩展自定义指标)
if gameResult.IsWin {
log.Infof("[%s] 游戏胜利 - 分数: %d, 时长: %v", pluginName, gameResult.Score, duration)
} else {
log.Infof("[%s] 游戏结束 - 分数: %d, 时长: %v", pluginName, gameResult.Score, duration)
}
}
自定义监控中间件¶
// 监控中间件接口
type MonitoringMiddleware interface {
BeforeExecution(ctx *robot.Ctx, pluginName string)
AfterExecution(ctx *robot.Ctx, pluginName string, duration time.Duration, err error)
}
// 默认监控中间件
type DefaultMonitoringMiddleware struct{}
func (dmm *DefaultMonitoringMiddleware) BeforeExecution(ctx *robot.Ctx, pluginName string) {
log.Debugf("[%s] 开始处理消息: %s", pluginName, ctx.MessageString())
}
func (dmm *DefaultMonitoringMiddleware) AfterExecution(ctx *robot.Ctx, pluginName string, duration time.Duration, err error) {
hasError := err != nil
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
if hasError {
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
log.Errorf("[%s] 处理失败: %v, 耗时: %v", pluginName, err, duration)
} else {
log.Debugf("[%s] 处理完成, 耗时: %v", pluginName, duration)
}
}
// 监控处理器包装器
func WithMiddleware(pluginName string, middleware MonitoringMiddleware, handler func(*robot.Ctx) error) func(*robot.Ctx) {
return func(ctx *robot.Ctx) {
middleware.BeforeExecution(ctx, pluginName)
start := time.Now()
err := handler(ctx)
duration := time.Since(start)
middleware.AfterExecution(ctx, pluginName, duration, err)
}
}
🚨 告警配置¶
插件告警规则¶
// 插件专用告警规则
func setupPluginAlerts(pluginName string) {
// 错误率过高告警
errorRateAlert := monitor.AlertRule{
Name: fmt.Sprintf("%s_high_error_rate", pluginName),
Condition: func(metrics *monitor.DashboardMetrics) bool {
if pluginStats, exists := metrics.ErrorsByPlugin[pluginName]; exists {
totalCalls := getTotalPluginCalls(pluginName, metrics)
if totalCalls > 0 {
errorRate := float64(pluginStats) / float64(totalCalls)
return errorRate > 0.1 // 10%错误率告警
}
}
return false
},
Severity: monitor.AlertSeverityHigh,
Cooldown: 5 * time.Minute,
Message: fmt.Sprintf("插件[%s]错误率过高,请检查插件状态", pluginName),
}
// 执行时间过长告警
slowExecutionAlert := monitor.AlertRule{
Name: fmt.Sprintf("%s_slow_execution", pluginName),
Condition: func(metrics *monitor.DashboardMetrics) bool {
// 这里需要扩展metrics结构来支持插件执行时间统计
return false // 示例,需要实际实现
},
Severity: monitor.AlertSeverityMedium,
Cooldown: 10 * time.Minute,
Message: fmt.Sprintf("插件[%s]执行时间过长,可能存在性能问题", pluginName),
}
// 注册告警规则
monitor.GlobalMetrics.AddAlertRule(errorRateAlert)
monitor.GlobalMetrics.AddAlertRule(slowExecutionAlert)
}
// 在插件初始化时设置告警
func init() {
engine := control.Register("myplugin", &control.Options{Alias: "我的插件"})
// 设置插件专用告警
setupPluginAlerts("myplugin")
}
动态告警阈值¶
// 动态告警配置
type DynamicAlertConfig struct {
PluginName string
ErrorRateThreshold float64
SlowExecutionThreshold time.Duration
MinSampleSize int
}
func (dac *DynamicAlertConfig) UpdateThresholds(historicalData PluginHistoricalData) {
// 基于历史数据动态调整阈值
avgErrorRate := historicalData.AverageErrorRate
avgExecutionTime := historicalData.AverageExecutionTime
// 错误率阈值 = 历史平均值 * 2
dac.ErrorRateThreshold = avgErrorRate * 2
// 执行时间阈值 = 历史平均值 * 3
dac.SlowExecutionThreshold = time.Duration(float64(avgExecutionTime) * 3)
log.Infof("[%s] 更新告警阈值 - 错误率: %.2f%%, 执行时间: %v",
dac.PluginName, dac.ErrorRateThreshold*100, dac.SlowExecutionThreshold)
}
📊 Web仪表板¶
访问监控仪表板¶
监控仪表板地址:http://localhost:7601/monitor.html
自定义监控数据¶
// 扩展仪表板数据
func GetCustomDashboardData(pluginName string) map[string]interface{} {
data := make(map[string]interface{})
// 插件特定统计
data["plugin_name"] = pluginName
data["total_executions"] = getPluginExecutionCount(pluginName)
data["average_response_time"] = getPluginAverageResponseTime(pluginName)
data["success_rate"] = getPluginSuccessRate(pluginName)
// 最近24小时趋势
data["hourly_stats"] = getPluginHourlyStats(pluginName, 24)
// 错误统计
data["recent_errors"] = getPluginRecentErrors(pluginName, 10)
// 用户活跃度
data["active_users"] = getPluginActiveUsers(pluginName)
return data
}
// 插件监控API端点
func registerPluginMonitoringAPI(engine *control.Engine, pluginName string) {
// 注意:这需要在Web框架中实现
// 这里仅展示数据结构
pluginStatsHandler := func(ctx *robot.Ctx) {
stats := GetCustomDashboardData(pluginName)
// 转换为JSON响应(实际需要HTTP框架支持)
response := fmt.Sprintf("插件[%s]统计数据:\\n", pluginName)
response += fmt.Sprintf("总执行次数: %v\\n", stats["total_executions"])
response += fmt.Sprintf("平均响应时间: %v\\n", stats["average_response_time"])
response += fmt.Sprintf("成功率: %v\\n", stats["success_rate"])
ctx.ReplyText(response)
}
engine.OnCommand("stats", robot.AdminPermission).Handle(pluginStatsHandler)
}
实时数据推送¶
// WebSocket推送(概念性实现)
type MonitoringWebSocket struct {
pluginName string
connections map[string]*websocket.Conn
mutex sync.RWMutex
}
func (mws *MonitoringWebSocket) BroadcastPluginMetrics(metrics PluginMetrics) {
mws.mutex.RLock()
defer mws.mutex.RUnlock()
message := map[string]interface{}{
"type": "plugin_metrics",
"plugin_name": mws.pluginName,
"timestamp": time.Now(),
"data": metrics,
}
for connID, conn := range mws.connections {
err := conn.WriteJSON(message)
if err != nil {
log.Warnf("推送失败,移除连接: %s", connID)
delete(mws.connections, connID)
}
}
}
🔧 监控最佳实践¶
1. 监控数据粒度控制¶
// 监控级别配置
type MonitoringLevel int
const (
MonitoringLevelNone MonitoringLevel = iota
MonitoringLevelBasic // 基础指标:执行时间、错误次数
MonitoringLevelDetail // 详细指标:参数记录、返回值分析
MonitoringLevelDebug // 调试指标:完整请求响应日志
)
// 可配置的监控处理器
func WithConfigurableMonitoring(pluginName string, level MonitoringLevel, handler func(*robot.Ctx)) func(*robot.Ctx) {
return func(ctx *robot.Ctx) {
if level == MonitoringLevelNone {
handler(ctx)
return
}
start := time.Now()
var hasError bool
// Debug级别记录请求
if level >= MonitoringLevelDebug {
log.Debugf("[%s] 收到消息: %s", pluginName, ctx.MessageString())
}
defer func() {
duration := time.Since(start)
// 基础监控
if level >= MonitoringLevelBasic {
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
}
// 详细监控
if level >= MonitoringLevelDetail {
userID := ctx.Event.FinalFromWxId
groupID := ctx.Event.FromWxId
log.Infof("[%s] 用户:%s 群组:%s 耗时:%v", pluginName, userID, groupID, duration)
}
if r := recover(); r != nil {
hasError = true
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
log.Errorf("[%s] 异常: %v", pluginName, r)
}
}()
handler(ctx)
}
}
2. 性能监控优化¶
// 高性能监控(减少锁竞争)
type HighPerformanceMonitor struct {
pluginName string
metrics chan MetricData
buffer []MetricData
mutex sync.Mutex
}
type MetricData struct {
Timestamp time.Time
Duration time.Duration
HasError bool
UserID string
GroupID string
}
func NewHighPerformanceMonitor(pluginName string) *HighPerformanceMonitor {
hpm := &HighPerformanceMonitor{
pluginName: pluginName,
metrics: make(chan MetricData, 1000), // 缓冲通道
buffer: make([]MetricData, 0, 100),
}
// 异步处理监控数据
go hpm.processMetrics()
return hpm
}
func (hpm *HighPerformanceMonitor) RecordExecution(duration time.Duration, hasError bool, userID, groupID string) {
select {
case hpm.metrics <- MetricData{
Timestamp: time.Now(),
Duration: duration,
HasError: hasError,
UserID: userID,
GroupID: groupID,
}:
default:
// 通道满时丢弃(避免阻塞)
log.Warnf("[%s] 监控通道已满,丢弃指标", hpm.pluginName)
}
}
func (hpm *HighPerformanceMonitor) processMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case metric := <-hpm.metrics:
hpm.buffer = append(hpm.buffer, metric)
// 缓冲区满时批量处理
if len(hpm.buffer) >= 100 {
hpm.flushMetrics()
}
case <-ticker.C:
// 定时刷新
if len(hpm.buffer) > 0 {
hpm.flushMetrics()
}
}
}
}
func (hpm *HighPerformanceMonitor) flushMetrics() {
hpm.mutex.Lock()
defer hpm.mutex.Unlock()
if len(hpm.buffer) == 0 {
return
}
// 批量提交到监控系统
for _, metric := range hpm.buffer {
monitor.GlobalMetrics.RecordPluginExecution(hpm.pluginName, metric.Duration, metric.HasError)
}
// 清空缓冲区
hpm.buffer = hpm.buffer[:0]
}
3. 监控数据存储¶
// 监控数据持久化
type MonitoringStorage struct {
db *gorm.DB
}
// 监控数据模型
type PluginMetricRecord struct {
ID uint `gorm:"primaryKey"`
PluginName string `gorm:"column:plugin_name;index"`
UserID string `gorm:"column:user_id;index"`
GroupID string `gorm:"column:group_id;index"`
Duration int64 `gorm:"column:duration"` // 毫秒
HasError bool `gorm:"column:has_error"`
ErrorMsg string `gorm:"column:error_msg"`
CreatedAt time.Time `gorm:"column:created_at;index"`
}
func (ms *MonitoringStorage) SaveMetrics(pluginName string, metrics []MetricData) error {
if len(metrics) == 0 {
return nil
}
records := make([]PluginMetricRecord, len(metrics))
for i, metric := range metrics {
records[i] = PluginMetricRecord{
PluginName: pluginName,
UserID: metric.UserID,
GroupID: metric.GroupID,
Duration: metric.Duration.Milliseconds(),
HasError: metric.HasError,
CreatedAt: metric.Timestamp,
}
}
// 批量插入
return ms.db.CreateInBatches(records, 100).Error
}
// 监控数据查询
func (ms *MonitoringStorage) GetPluginStats(pluginName string, since time.Time) (*PluginStatistics, error) {
var stats PluginStatistics
err := ms.db.Model(&PluginMetricRecord{}).
Select("COUNT(*) as total_calls, AVG(duration) as avg_duration, COUNT(CASE WHEN has_error THEN 1 END) as error_count").
Where("plugin_name = ? AND created_at >= ?", pluginName, since).
Scan(&stats).Error
return &stats, err
}
📚 API参考¶
监控API接口¶
// 全局监控接口
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
monitor.GlobalMetrics.RecordMessage()
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
monitor.GlobalMetrics.UpdateConnectionStatus(status)
// 插件名称映射
robot.RegisterEnginePluginName(engine, pluginName)
pluginName := robot.GetEnginePluginName(engine)
// 监控数据获取
metrics := monitor.GlobalMetrics.GetCurrentMetrics()
alerts := monitor.GlobalMetrics.GetActiveAlerts()
监控配置¶
// 监控配置结构
type MonitoringConfig struct {
Enabled bool `yaml:"enabled"`
Level string `yaml:"level"` // none/basic/detail/debug
MetricsRetention time.Duration `yaml:"metrics_retention"`
AlertCooldown time.Duration `yaml:"alert_cooldown"`
WebDashboard bool `yaml:"web_dashboard"`
StorageEnabled bool `yaml:"storage_enabled"`
}
📚 相关文档¶
**📊 让数据指导决策,让监控提升质量!** **用监控系统构建可观测的微信机器人!**