Skip to content

插件开发

Umoo 的插件系统允许你向平台添加新功能——新的 HTTP 端点、总线消费者、Prometheus 指标和数据库 Schema——而无需修改核心服务器代码。本指南将引导你完成后端 SDK 插件的编写、测试和注册。


目录

  1. 插件类型
  2. 插件 SDK 概览
  3. 编写后端插件
  4. HTTP 路由
  5. 数据库迁移
  6. 消息总线
  7. Prometheus 指标
  8. KV 存储
  9. 插件配置
  10. RBAC 权限
  11. 注册插件
  12. 编写代理端插件
  13. 测试

插件类型

后端插件分为两种类型:

类型接口能力参考实现
SDK 插件pluginsdk.BackendPluginHTTP 路由、Prometheus 指标、DB 迁移、总线、KV 存储、按路由 RBACinternal/backend/plugins/terminal/
经典插件backendplugin.ServerPlugin仅 DB 迁移、总线internal/backend/plugins/wireguard/

新开发请使用 SDK 插件。 经典插件因历史原因而存在,能力集更有限。


插件 SDK 概览

SDK 位于 internal/pluginsdk/零平台依赖——插件作者只需引入此包。

核心接口:

接口文件用途
BackendPluginplugin.go生命周期钩子:InitStartStopHealth
PluginManifestmanifest.go声明式能力、权限、配置 Schema
HostAPIhost.go访问 HTTP 路由器、总线、DB、KV 存储、Logger
HTTPRouterhttp.go注册带 RBAC 的 HTTP 路由
Bushost.go发布和订阅总线消息
DBhost.go运行 DB 迁移并执行租户范围查询
MetricsRegistrymetrics.go注册 Prometheus 计数器、Gauge 和 Histogram
KVStorehost.go用于插件状态的命名空间键值存储

编写后端插件

实现 BackendPlugin

internal/backend/plugins/{your-plugin}/ 下创建一个包。

go
package myplugin

import (
    "context"

    "github.com/autofacts/umoo/internal/pluginsdk"
)

type Plugin struct {
    host pluginsdk.HostAPI
}

func New() *Plugin {
    return &Plugin{}
}

func (p *Plugin) Manifest() pluginsdk.PluginManifest {
    return pluginsdk.PluginManifest{
        ID:      "myplugin",
        Version: "1.0.0",
    }
}

func (p *Plugin) Init(ctx context.Context, host pluginsdk.HostAPI) error {
    p.host = host
    return nil
}

func (p *Plugin) Start(ctx context.Context) error {
    return nil
}

func (p *Plugin) Stop(ctx context.Context) error {
    return nil
}

func (p *Plugin) Health() pluginsdk.HealthReport {
    return pluginsdk.HealthReport{Healthy: true}
}

声明 Manifest

Manifest() 方法告知平台你的插件需要哪些能力。平台只为在此声明的能力提供非 nil 的 HostAPI 子接口。

go
func (p *Plugin) Manifest() pluginsdk.PluginManifest {
    return pluginsdk.PluginManifest{
        ID:      "myplugin",
        Version: "1.0.0",

        // 声明所需的每个能力:
        HTTP: &pluginsdk.HTTPCapability{
            Routes: []pluginsdk.RouteDecl{
                {Method: "GET", Path: "/items/{id}", SubResource: "items", Action: "read"},
            },
        },
        Bus: &pluginsdk.BusCapability{
            Subscribes: []string{"evt.myplugin.v1.*"},
            Publishes:  []string{"cmd.myplugin.v1.notify"},
        },
        DB: &pluginsdk.DBCapability{},
        Metrics: &pluginsdk.MetricsCapability{},

        // 在平台 UI 中展示的配置参数:
        ConfigParams: []pluginsdk.ConfigParam{
            {
                Key:         "timeout_seconds",
                Type:        pluginsdk.ConfigParamTypeInt,
                Default:     "30",
                Description: "请求超时时间(秒)",
            },
        },

        // 启动时初始化的 RBAC 权限:
        Permissions: []pluginsdk.PluginPermissionDecl{
            {SubResource: "items", Action: "read",   Description: "读取条目"},
            {SubResource: "items", Action: "write",  Description: "创建或更新条目"},
        },
    }
}

使用 HostAPI

Init(ctx, host) 中,调用 host.*() 获取子接口。每个子接口仅在 Manifest() 中声明了相应能力时才为非 nil。

go
func (p *Plugin) Init(ctx context.Context, host pluginsdk.HostAPI) error {
    p.host = host

    // 注册 HTTP 路由
    if err := p.registerRoutes(); err != nil {
        return err
    }

    // 运行 DB 迁移
    if err := host.DB().Migrate(ctx, migrations); err != nil {
        return fmt.Errorf("migration failed: %w", err)
    }

    // 订阅总线主题
    host.Bus().Subscribe("evt.myplugin.v1.*", p.handleEvent)

    // 注册 Prometheus 指标
    p.itemsTotal = host.Metrics().NewCounter("myplugin_items_total", "已处理条目总数")

    return nil
}

Init() 中注册路由,不要在 Start() 中注册。 路由必须在 ListenAndServe 调用前完成注册。


HTTP 路由

使用 host.HTTP() 注册路由。所有路由自动挂载在 /api/v1/plugins/{plugin-id}/ 下。

go
func (p *Plugin) registerRoutes() error {
    router := p.host.HTTP()

    // 受保护路由——调用方需要 "myplugin/items:read" 权限
    router.Handle("GET /items/{id}", "items", "read", p.handleGetItem)

    // 受保护路由——调用方需要 "myplugin/items:write" 权限
    router.Handle("POST /items", "items", "write", p.handleCreateItem)

    // 公共路由——无需认证(仅用于 webhook 或健康检查)
    router.HandlePublic("GET /health", p.handleHealth)

    return nil
}

Handler 签名:

go
func (p *Plugin) handleGetItem(w http.ResponseWriter, r *http.Request) {
    // 租户 ID 由 TenantAuthMiddleware 注入到 context 中
    tenantID := r.Context().Value(tenantCtxKey).(uuid.UUID)
    id := r.PathValue("id")
    // ...
}

所有受保护路由的中间件栈:

频率限制 → JWT 认证 → 租户认证(X-Tenant-ID 请求头)→ RBAC 检查 → 你的 handler

数据库迁移

将迁移声明为 pluginsdk.Migration 切片。平台在 plugin_migrations_{plugin_id} 中跟踪已应用的迁移,并以幂等方式执行。

go
var migrations = []pluginsdk.Migration{
    {
        Version: 1,
        SQL: `
            CREATE TABLE IF NOT EXISTS myplugin_items (
                id         UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                tenant_id  UUID NOT NULL,
                name       TEXT NOT NULL,
                created_at TIMESTAMPTZ NOT NULL DEFAULT now()
            );

            ALTER TABLE myplugin_items ENABLE ROW LEVEL SECURITY;

            CREATE POLICY tenant_isolation ON myplugin_items
                USING (tenant_id = current_setting('app.tenant_id', true)::uuid);
        `,
    },
}

Init 中执行:

go
if err := host.DB().Migrate(ctx, migrations); err != nil {
    return fmt.Errorf("migrate: %w", err)
}

执行查询

对于启用 RLS 的表,始终使用 WithTenant(tenantID)

go
func (p *Plugin) getItem(ctx context.Context, tenantID, itemID uuid.UUID) error {
    return p.host.DB().WithTenant(tenantID).QueryRow(ctx,
        `SELECT id, name FROM myplugin_items WHERE id = $1`,
        itemID,
    ).Scan(&item.ID, &item.Name)
}

在 RLS 表上调用 host.DB() 时,绝不能省略 WithTenant() 省略将绕过租户隔离。


消息总线

go
// 在 Init 中订阅:
host.Bus().Subscribe("evt.myplugin.v1.*", p.handleEvent)

// 在任意位置发布:
func (p *Plugin) notify(ctx context.Context, deviceID uuid.UUID) error {
    return p.host.Bus().Publish(ctx,
        fmt.Sprintf("device/%s/cmd.myplugin.v1.notify", deviceID),
        map[string]any{"timestamp": time.Now().UTC()},
    )
}

// Handler 签名:
func (p *Plugin) handleEvent(ctx context.Context, topic string, payload []byte) {
    // 解析载荷、更新状态等
}

主题命名约定:

  • evt.{plugin}.v{N}.{event} — 事件(设备→服务端或服务端内部)
  • cmd.{plugin}.v{N}.{command} — 命令(服务端→设备)
  • device/{id}/... — 按设备主题

Prometheus 指标

go
// 在 Init 中声明:
p.requestsTotal = host.Metrics().NewCounter(
    "myplugin_requests_total",
    "myplugin 插件处理的 HTTP 请求总数",
)
p.activeItems = host.Metrics().NewGauge(
    "myplugin_active_items",
    "当前活跃条目数",
)
p.requestDuration = host.Metrics().NewHistogram(
    "myplugin_request_duration_seconds",
    "HTTP 请求耗时(秒)",
)

// 使用:
p.requestsTotal.Inc()
p.activeItems.Set(float64(count))
p.requestDuration.Observe(elapsed.Seconds())

平台会自动为通过 MetricsRegistry 注册的所有指标添加 plugin_id 标签。


KV 存储

host.Store() KV 存储始终可用(无需在 Manifest 中声明)。它提供由平台存储层支持的简单命名空间键值存储。

go
// 存储:
if err := p.host.Store().Set(ctx, "last_sync", []byte(time.Now().Format(time.RFC3339))); err != nil {
    return fmt.Errorf("store set: %w", err)
}

// 读取:
val, err := p.host.Store().Get(ctx, "last_sync")
if err != nil {
    return fmt.Errorf("store get: %w", err)
}

存储按插件命名空间隔离——myplugin 中的 "last_sync" 与其他插件中的同名键完全隔离。


插件配置

通过 host.Store() 或在服务层调用 GetPluginConfig 读取按租户的插件配置。配置通过 cfg.plugins.v1.set 总线消息推送到设备代理。

go
// 读取配置快照(通过 SetPluginConfig RPC 设置):
snapshot, err := p.host.Store().Get(ctx, "config")
if err != nil {
    return nil
}
var cfg MyConfig
json.Unmarshal(snapshot, &cfg)

RBAC 权限

Manifest().Permissions 中声明的权限,由 SDKPluginManager.Init() 在启动时初始化到 permissions 表中。每个权限遵循 "{plugin-id}/{subresource}:{action}" 命名规范。

在前端代码中,使用以下方式控制组件显示:

typescript
can(perms, 'myplugin/items', 'read')   // 显示条目列表
can(perms, 'myplugin/items', 'write')  // 显示创建/编辑按钮

默认角色分配:在 PluginPermissionDecl.DefaultRoles 中声明。若未设置,权限默认不授予任何角色,需由 tenant_admin 手动分配。


注册插件

cmd/umoo/main.go 中将插件注册到 SDK 注册表:

go
import myplugin "github.com/autofacts/umoo/internal/backend/plugins/myplugin"

// 在 main() 中:
sdkRegistry.RegisterBuiltin(myplugin.New())

就这些。SDKPluginManager 在服务器启动时自动调用 InitStart。如果插件的 Init 返回错误,启动时记录错误但继续运行——其他插件不受影响。


编写代理端插件

代理端插件运行在设备上,实现 pluginsdk.AgentPlugin

go
type AgentPlugin interface {
    ID() string
    Init(ctx context.Context, host AgentHostAPI) error
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Health() HealthReport
}

与后端插件的主要区别:

  • 无 HTTP 路由或 Prometheus 指标
  • AgentHostAPI 仅提供 Bus()Store()Logger()
  • 订阅 cmd.{plugin}.* 主题,发布 evt.{plugin}.* 主题
  • 监听 cfg.plugins.v1.set 以热重载配置

源码:internal/agent/plugins/。参考 internal/agent/plugins/telemetry/ 作为简单代理端插件的示例。

cmd/umoo-agent/main.go 中注册。


测试

单元测试

在同一包中编写单元测试(package myplugin),使用文件内 mock 替代依赖项:

go
type mockDB struct {
    mu    sync.RWMutex
    items map[uuid.UUID]*Item
}

func (m *mockDB) WithTenant(id uuid.UUID) pluginsdk.DB { return m }
func (m *mockDB) Migrate(ctx context.Context, migrations []pluginsdk.Migration) error { return nil }
// ...

集成测试

集成测试位于 tests/e2e/。使用 testutil.SeedTenant() 创建测试租户,使用 setupTestEnv(t) 连接到运行中的服务器。

go
func TestMyPluginGetItem(t *testing.T) {
    requireDockerInfra(t)
    env := setupTestEnv(t)
    tenant := testutil.SeedTenant(t, env.DB)
    // 向运行中的服务器发起请求...
}

运行集成测试:

bash
make docker-up
make test-e2e

参见

Umoo — IoT Device Management Platform