跳转至

📊 WxBot Enhanced 监控集成指南

![Monitoring Integration](../images/monitoring-banner.png) **深度集成WxBot Enhanced监控系统,实现插件级监控**

📋 目录导航


🎯 监控系统概述

WxBot Enhanced监控架构

WxBot Enhanced内置了完整的监控系统,提供:

  • 📊 实时指标收集 - 系统性能、业务指标、插件执行数据
  • 🚨 多级告警机制 - Critical、High、Medium、Low四级告警
  • 📈 Web可视化仪表板 - 实时数据展示和历史趋势分析
  • 🔍 插件级细粒度监控 - 每个插件的执行时间、错误率、调用频次
  • 📱 响应式界面 - 支持PC和移动端访问

监控数据类型

监控维度 数据类型 说明
系统指标 CPU、内存、协程 运行时系统资源使用情况
业务指标 消息量、错误率、处理速度 核心业务运行状态
插件指标 执行时间、错误计数、调用频次 每个插件的详细性能数据
框架指标 事件缓冲区、匹配器数量、连接状态 框架层运行状态

📈 插件监控集成

基础监控集成

package myplugin

import (
    "time"
    "github.com/ruk1ng001/wxbot/engine/control"
    "github.com/ruk1ng001/wxbot/engine/robot"
    "github.com/ruk1ng001/wxbot/engine/pkg/monitor"
)

func init() {
    engine := control.Register("myplugin", &control.Options{
        Alias:      "我的插件",
        DataFolder: "myplugin",
    })

    // 注册插件名称映射(重要!)
    robot.RegisterEnginePluginName(engine, "myplugin")

    // 注册处理器
    engine.OnPrefix("test").Handle(monitoredHandler)
}

// 自动监控的处理器
func monitoredHandler(ctx *robot.Ctx) {
    start := time.Now()
    var hasError bool

    defer func() {
        // 自动记录插件执行指标
        duration := time.Since(start)
        pluginName := robot.GetEnginePluginName(ctx.GetMatcher().Engine)

        monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)

        // 记录消息处理
        monitor.GlobalMetrics.RecordMessage()

        // 恢复panic并记录错误
        if r := recover(); r != nil {
            hasError = true
            monitor.GlobalMetrics.IncrementErrorCount(pluginName)
            log.Errorf("[%s] 插件执行异常: %v", pluginName, r)
        }
    }()

    // 业务逻辑
    result, err := processMessage(ctx.MessageString())
    if err != nil {
        hasError = true
        ctx.ReplyText("处理失败: " + err.Error())
        return
    }

    ctx.ReplyText(result)
}

高级监控集成

// 监控装饰器
func WithMonitoring(pluginName string, handler func(*robot.Ctx)) func(*robot.Ctx) {
    return func(ctx *robot.Ctx) {
        start := time.Now()
        var hasError bool

        defer func() {
            duration := time.Since(start)
            monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)

            if r := recover(); r != nil {
                hasError = true
                monitor.GlobalMetrics.IncrementErrorCount(pluginName)
                log.Errorf("[%s] 插件异常: %v", pluginName, r)
                ctx.ReplyText("系统异常,请稍后重试")
            }
        }()

        handler(ctx)
    }
}

// 使用监控装饰器
func init() {
    engine := control.Register("myplugin", &control.Options{
        Alias: "我的插件",
    })

    // 所有处理器都自动监控
    engine.OnPrefix("cmd1").Handle(WithMonitoring("myplugin", cmd1Handler))
    engine.OnPrefix("cmd2").Handle(WithMonitoring("myplugin", cmd2Handler))
    engine.OnPrefix("cmd3").Handle(WithMonitoring("myplugin", cmd3Handler))
}

批量监控注册

// 批量监控管理器
type PluginMonitorManager struct {
    pluginName string
    metrics    *monitor.MetricsCollector
}

func NewPluginMonitorManager(pluginName string) *PluginMonitorManager {
    return &PluginMonitorManager{
        pluginName: pluginName,
        metrics:    monitor.GlobalMetrics,
    }
}

// 注册监控处理器
func (pmm *PluginMonitorManager) RegisterHandler(engine *control.Engine, trigger string, handler func(*robot.Ctx)) {
    monitoredHandler := func(ctx *robot.Ctx) {
        start := time.Now()
        var hasError bool

        defer func() {
            duration := time.Since(start)
            pmm.metrics.RecordPluginExecution(pmm.pluginName, duration, hasError)

            if r := recover(); r != nil {
                hasError = true
                pmm.metrics.IncrementErrorCount(pmm.pluginName)
                log.Errorf("[%s] 处理器异常: %v", pmm.pluginName, r)
            }
        }()

        handler(ctx)
    }

    engine.OnPrefix(trigger).Handle(monitoredHandler)
}

// 使用示例
func init() {
    engine := control.Register("myplugin", &control.Options{Alias: "我的插件"})
    robot.RegisterEnginePluginName(engine, "myplugin")

    monitor := NewPluginMonitorManager("myplugin")

    // 批量注册
    monitor.RegisterHandler(engine, "cmd1", cmd1Handler)
    monitor.RegisterHandler(engine, "cmd2", cmd2Handler)
    monitor.RegisterHandler(engine, "cmd3", cmd3Handler)
}

🔍 自定义指标

业务指标统计

// 业务指标记录
func recordBusinessMetrics(ctx *robot.Ctx, operation string, success bool) {
    pluginName := robot.GetEnginePluginName(ctx.GetMatcher().Engine)

    // 记录操作次数
    monitor.GlobalMetrics.RecordMessage()

    // 记录操作结果
    if success {
        // 可以扩展成功指标
        log.Infof("[%s] 操作成功: %s", pluginName, operation)
    } else {
        // 记录失败
        monitor.GlobalMetrics.IncrementErrorCount(pluginName)
        log.Warnf("[%s] 操作失败: %s", pluginName, operation)
    }

    // 更新连接状态(如果相关)
    monitor.GlobalMetrics.UpdateConnectionStatus("connected")
}

// 游戏插件专用指标
func recordGameMetrics(pluginName string, gameResult GameResult) {
    // 记录游戏次数
    monitor.GlobalMetrics.RecordMessage()

    // 记录游戏时长
    duration := gameResult.Duration
    monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, false)

    // 记录游戏结果(可以扩展自定义指标)
    if gameResult.IsWin {
        log.Infof("[%s] 游戏胜利 - 分数: %d, 时长: %v", pluginName, gameResult.Score, duration)
    } else {
        log.Infof("[%s] 游戏结束 - 分数: %d, 时长: %v", pluginName, gameResult.Score, duration)
    }
}

自定义监控中间件

// 监控中间件接口
type MonitoringMiddleware interface {
    BeforeExecution(ctx *robot.Ctx, pluginName string)
    AfterExecution(ctx *robot.Ctx, pluginName string, duration time.Duration, err error)
}

// 默认监控中间件
type DefaultMonitoringMiddleware struct{}

func (dmm *DefaultMonitoringMiddleware) BeforeExecution(ctx *robot.Ctx, pluginName string) {
    log.Debugf("[%s] 开始处理消息: %s", pluginName, ctx.MessageString())
}

func (dmm *DefaultMonitoringMiddleware) AfterExecution(ctx *robot.Ctx, pluginName string, duration time.Duration, err error) {
    hasError := err != nil
    monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)

    if hasError {
        monitor.GlobalMetrics.IncrementErrorCount(pluginName)
        log.Errorf("[%s] 处理失败: %v, 耗时: %v", pluginName, err, duration)
    } else {
        log.Debugf("[%s] 处理完成, 耗时: %v", pluginName, duration)
    }
}

// 监控处理器包装器
func WithMiddleware(pluginName string, middleware MonitoringMiddleware, handler func(*robot.Ctx) error) func(*robot.Ctx) {
    return func(ctx *robot.Ctx) {
        middleware.BeforeExecution(ctx, pluginName)

        start := time.Now()
        err := handler(ctx)
        duration := time.Since(start)

        middleware.AfterExecution(ctx, pluginName, duration, err)
    }
}

🚨 告警配置

插件告警规则

// 插件专用告警规则
func setupPluginAlerts(pluginName string) {
    // 错误率过高告警
    errorRateAlert := monitor.AlertRule{
        Name: fmt.Sprintf("%s_high_error_rate", pluginName),
        Condition: func(metrics *monitor.DashboardMetrics) bool {
            if pluginStats, exists := metrics.ErrorsByPlugin[pluginName]; exists {
                totalCalls := getTotalPluginCalls(pluginName, metrics)
                if totalCalls > 0 {
                    errorRate := float64(pluginStats) / float64(totalCalls)
                    return errorRate > 0.1 // 10%错误率告警
                }
            }
            return false
        },
        Severity: monitor.AlertSeverityHigh,
        Cooldown: 5 * time.Minute,
        Message:  fmt.Sprintf("插件[%s]错误率过高,请检查插件状态", pluginName),
    }

    // 执行时间过长告警
    slowExecutionAlert := monitor.AlertRule{
        Name: fmt.Sprintf("%s_slow_execution", pluginName),
        Condition: func(metrics *monitor.DashboardMetrics) bool {
            // 这里需要扩展metrics结构来支持插件执行时间统计
            return false // 示例,需要实际实现
        },
        Severity: monitor.AlertSeverityMedium,
        Cooldown: 10 * time.Minute,
        Message:  fmt.Sprintf("插件[%s]执行时间过长,可能存在性能问题", pluginName),
    }

    // 注册告警规则
    monitor.GlobalMetrics.AddAlertRule(errorRateAlert)
    monitor.GlobalMetrics.AddAlertRule(slowExecutionAlert)
}

// 在插件初始化时设置告警
func init() {
    engine := control.Register("myplugin", &control.Options{Alias: "我的插件"})

    // 设置插件专用告警
    setupPluginAlerts("myplugin")
}

动态告警阈值

// 动态告警配置
type DynamicAlertConfig struct {
    PluginName       string
    ErrorRateThreshold float64
    SlowExecutionThreshold time.Duration
    MinSampleSize    int
}

func (dac *DynamicAlertConfig) UpdateThresholds(historicalData PluginHistoricalData) {
    // 基于历史数据动态调整阈值
    avgErrorRate := historicalData.AverageErrorRate
    avgExecutionTime := historicalData.AverageExecutionTime

    // 错误率阈值 = 历史平均值 * 2
    dac.ErrorRateThreshold = avgErrorRate * 2

    // 执行时间阈值 = 历史平均值 * 3
    dac.SlowExecutionThreshold = time.Duration(float64(avgExecutionTime) * 3)

    log.Infof("[%s] 更新告警阈值 - 错误率: %.2f%%, 执行时间: %v", 
        dac.PluginName, dac.ErrorRateThreshold*100, dac.SlowExecutionThreshold)
}

📊 Web仪表板

访问监控仪表板

监控仪表板地址:http://localhost:7601/monitor.html

自定义监控数据

// 扩展仪表板数据
func GetCustomDashboardData(pluginName string) map[string]interface{} {
    data := make(map[string]interface{})

    // 插件特定统计
    data["plugin_name"] = pluginName
    data["total_executions"] = getPluginExecutionCount(pluginName)
    data["average_response_time"] = getPluginAverageResponseTime(pluginName)
    data["success_rate"] = getPluginSuccessRate(pluginName)

    // 最近24小时趋势
    data["hourly_stats"] = getPluginHourlyStats(pluginName, 24)

    // 错误统计
    data["recent_errors"] = getPluginRecentErrors(pluginName, 10)

    // 用户活跃度
    data["active_users"] = getPluginActiveUsers(pluginName)

    return data
}

// 插件监控API端点
func registerPluginMonitoringAPI(engine *control.Engine, pluginName string) {
    // 注意:这需要在Web框架中实现
    // 这里仅展示数据结构

    pluginStatsHandler := func(ctx *robot.Ctx) {
        stats := GetCustomDashboardData(pluginName)

        // 转换为JSON响应(实际需要HTTP框架支持)
        response := fmt.Sprintf("插件[%s]统计数据:\\n", pluginName)
        response += fmt.Sprintf("总执行次数: %v\\n", stats["total_executions"])
        response += fmt.Sprintf("平均响应时间: %v\\n", stats["average_response_time"])
        response += fmt.Sprintf("成功率: %v\\n", stats["success_rate"])

        ctx.ReplyText(response)
    }

    engine.OnCommand("stats", robot.AdminPermission).Handle(pluginStatsHandler)
}

实时数据推送

// WebSocket推送(概念性实现)
type MonitoringWebSocket struct {
    pluginName string
    connections map[string]*websocket.Conn
    mutex      sync.RWMutex
}

func (mws *MonitoringWebSocket) BroadcastPluginMetrics(metrics PluginMetrics) {
    mws.mutex.RLock()
    defer mws.mutex.RUnlock()

    message := map[string]interface{}{
        "type": "plugin_metrics",
        "plugin_name": mws.pluginName,
        "timestamp": time.Now(),
        "data": metrics,
    }

    for connID, conn := range mws.connections {
        err := conn.WriteJSON(message)
        if err != nil {
            log.Warnf("推送失败,移除连接: %s", connID)
            delete(mws.connections, connID)
        }
    }
}

🔧 监控最佳实践

1. 监控数据粒度控制

// 监控级别配置
type MonitoringLevel int

const (
    MonitoringLevelNone MonitoringLevel = iota
    MonitoringLevelBasic  // 基础指标:执行时间、错误次数
    MonitoringLevelDetail // 详细指标:参数记录、返回值分析
    MonitoringLevelDebug  // 调试指标:完整请求响应日志
)

// 可配置的监控处理器
func WithConfigurableMonitoring(pluginName string, level MonitoringLevel, handler func(*robot.Ctx)) func(*robot.Ctx) {
    return func(ctx *robot.Ctx) {
        if level == MonitoringLevelNone {
            handler(ctx)
            return
        }

        start := time.Now()
        var hasError bool

        // Debug级别记录请求
        if level >= MonitoringLevelDebug {
            log.Debugf("[%s] 收到消息: %s", pluginName, ctx.MessageString())
        }

        defer func() {
            duration := time.Since(start)

            // 基础监控
            if level >= MonitoringLevelBasic {
                monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
            }

            // 详细监控
            if level >= MonitoringLevelDetail {
                userID := ctx.Event.FinalFromWxId
                groupID := ctx.Event.FromWxId
                log.Infof("[%s] 用户:%s 群组:%s 耗时:%v", pluginName, userID, groupID, duration)
            }

            if r := recover(); r != nil {
                hasError = true
                monitor.GlobalMetrics.IncrementErrorCount(pluginName)
                log.Errorf("[%s] 异常: %v", pluginName, r)
            }
        }()

        handler(ctx)
    }
}

2. 性能监控优化

// 高性能监控(减少锁竞争)
type HighPerformanceMonitor struct {
    pluginName string
    metrics    chan MetricData
    buffer     []MetricData
    mutex      sync.Mutex
}

type MetricData struct {
    Timestamp time.Time
    Duration  time.Duration
    HasError  bool
    UserID    string
    GroupID   string
}

func NewHighPerformanceMonitor(pluginName string) *HighPerformanceMonitor {
    hpm := &HighPerformanceMonitor{
        pluginName: pluginName,
        metrics:    make(chan MetricData, 1000), // 缓冲通道
        buffer:     make([]MetricData, 0, 100),
    }

    // 异步处理监控数据
    go hpm.processMetrics()

    return hpm
}

func (hpm *HighPerformanceMonitor) RecordExecution(duration time.Duration, hasError bool, userID, groupID string) {
    select {
    case hpm.metrics <- MetricData{
        Timestamp: time.Now(),
        Duration:  duration,
        HasError:  hasError,
        UserID:    userID,
        GroupID:   groupID,
    }:
    default:
        // 通道满时丢弃(避免阻塞)
        log.Warnf("[%s] 监控通道已满,丢弃指标", hpm.pluginName)
    }
}

func (hpm *HighPerformanceMonitor) processMetrics() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case metric := <-hpm.metrics:
            hpm.buffer = append(hpm.buffer, metric)

            // 缓冲区满时批量处理
            if len(hpm.buffer) >= 100 {
                hpm.flushMetrics()
            }

        case <-ticker.C:
            // 定时刷新
            if len(hpm.buffer) > 0 {
                hpm.flushMetrics()
            }
        }
    }
}

func (hpm *HighPerformanceMonitor) flushMetrics() {
    hpm.mutex.Lock()
    defer hpm.mutex.Unlock()

    if len(hpm.buffer) == 0 {
        return
    }

    // 批量提交到监控系统
    for _, metric := range hpm.buffer {
        monitor.GlobalMetrics.RecordPluginExecution(hpm.pluginName, metric.Duration, metric.HasError)
    }

    // 清空缓冲区
    hpm.buffer = hpm.buffer[:0]
}

3. 监控数据存储

// 监控数据持久化
type MonitoringStorage struct {
    db *gorm.DB
}

// 监控数据模型
type PluginMetricRecord struct {
    ID          uint      `gorm:"primaryKey"`
    PluginName  string    `gorm:"column:plugin_name;index"`
    UserID      string    `gorm:"column:user_id;index"`
    GroupID     string    `gorm:"column:group_id;index"`
    Duration    int64     `gorm:"column:duration"` // 毫秒
    HasError    bool      `gorm:"column:has_error"`
    ErrorMsg    string    `gorm:"column:error_msg"`
    CreatedAt   time.Time `gorm:"column:created_at;index"`
}

func (ms *MonitoringStorage) SaveMetrics(pluginName string, metrics []MetricData) error {
    if len(metrics) == 0 {
        return nil
    }

    records := make([]PluginMetricRecord, len(metrics))
    for i, metric := range metrics {
        records[i] = PluginMetricRecord{
            PluginName: pluginName,
            UserID:     metric.UserID,
            GroupID:    metric.GroupID,
            Duration:   metric.Duration.Milliseconds(),
            HasError:   metric.HasError,
            CreatedAt:  metric.Timestamp,
        }
    }

    // 批量插入
    return ms.db.CreateInBatches(records, 100).Error
}

// 监控数据查询
func (ms *MonitoringStorage) GetPluginStats(pluginName string, since time.Time) (*PluginStatistics, error) {
    var stats PluginStatistics

    err := ms.db.Model(&PluginMetricRecord{}).
        Select("COUNT(*) as total_calls, AVG(duration) as avg_duration, COUNT(CASE WHEN has_error THEN 1 END) as error_count").
        Where("plugin_name = ? AND created_at >= ?", pluginName, since).
        Scan(&stats).Error

    return &stats, err
}

📚 API参考

监控API接口

// 全局监控接口
monitor.GlobalMetrics.RecordPluginExecution(pluginName, duration, hasError)
monitor.GlobalMetrics.RecordMessage()
monitor.GlobalMetrics.IncrementErrorCount(pluginName)
monitor.GlobalMetrics.UpdateConnectionStatus(status)

// 插件名称映射
robot.RegisterEnginePluginName(engine, pluginName)
pluginName := robot.GetEnginePluginName(engine)

// 监控数据获取
metrics := monitor.GlobalMetrics.GetCurrentMetrics()
alerts := monitor.GlobalMetrics.GetActiveAlerts()

监控配置

// 监控配置结构
type MonitoringConfig struct {
    Enabled           bool          `yaml:"enabled"`
    Level             string        `yaml:"level"` // none/basic/detail/debug
    MetricsRetention  time.Duration `yaml:"metrics_retention"`
    AlertCooldown     time.Duration `yaml:"alert_cooldown"`
    WebDashboard      bool          `yaml:"web_dashboard"`
    StorageEnabled    bool          `yaml:"storage_enabled"`
}

📚 相关文档


**📊 让数据指导决策,让监控提升质量!** **用监控系统构建可观测的微信机器人!**