插件开发
Umoo 的插件系统允许你向平台添加新功能——新的 HTTP 端点、总线消费者、Prometheus 指标和数据库 Schema——而无需修改核心服务器代码。本指南将引导你完成后端 SDK 插件的编写、测试和注册。
目录
插件类型
后端插件分为两种类型:
| 类型 | 接口 | 能力 | 参考实现 |
|---|---|---|---|
| SDK 插件 | pluginsdk.BackendPlugin | HTTP 路由、Prometheus 指标、DB 迁移、总线、KV 存储、按路由 RBAC | internal/backend/plugins/terminal/ |
| 经典插件 | backendplugin.ServerPlugin | 仅 DB 迁移、总线 | internal/backend/plugins/wireguard/ |
新开发请使用 SDK 插件。 经典插件因历史原因而存在,能力集更有限。
插件 SDK 概览
SDK 位于 internal/pluginsdk/,零平台依赖——插件作者只需引入此包。
核心接口:
| 接口 | 文件 | 用途 |
|---|---|---|
BackendPlugin | plugin.go | 生命周期钩子:Init、Start、Stop、Health |
PluginManifest | manifest.go | 声明式能力、权限、配置 Schema |
HostAPI | host.go | 访问 HTTP 路由器、总线、DB、KV 存储、Logger |
HTTPRouter | http.go | 注册带 RBAC 的 HTTP 路由 |
Bus | host.go | 发布和订阅总线消息 |
DB | host.go | 运行 DB 迁移并执行租户范围查询 |
MetricsRegistry | metrics.go | 注册 Prometheus 计数器、Gauge 和 Histogram |
KVStore | host.go | 用于插件状态的命名空间键值存储 |
编写后端插件
实现 BackendPlugin
在 internal/backend/plugins/{your-plugin}/ 下创建一个包。
package myplugin
import (
"context"
"github.com/autofacts/umoo/internal/pluginsdk"
)
type Plugin struct {
host pluginsdk.HostAPI
}
func New() *Plugin {
return &Plugin{}
}
func (p *Plugin) Manifest() pluginsdk.PluginManifest {
return pluginsdk.PluginManifest{
ID: "myplugin",
Version: "1.0.0",
}
}
func (p *Plugin) Init(ctx context.Context, host pluginsdk.HostAPI) error {
p.host = host
return nil
}
func (p *Plugin) Start(ctx context.Context) error {
return nil
}
func (p *Plugin) Stop(ctx context.Context) error {
return nil
}
func (p *Plugin) Health() pluginsdk.HealthReport {
return pluginsdk.HealthReport{Healthy: true}
}声明 Manifest
Manifest() 方法告知平台你的插件需要哪些能力。平台只为在此声明的能力提供非 nil 的 HostAPI 子接口。
func (p *Plugin) Manifest() pluginsdk.PluginManifest {
return pluginsdk.PluginManifest{
ID: "myplugin",
Version: "1.0.0",
// 声明所需的每个能力:
HTTP: &pluginsdk.HTTPCapability{
Routes: []pluginsdk.RouteDecl{
{Method: "GET", Path: "/items/{id}", SubResource: "items", Action: "read"},
},
},
Bus: &pluginsdk.BusCapability{
Subscribes: []string{"evt.myplugin.v1.*"},
Publishes: []string{"cmd.myplugin.v1.notify"},
},
DB: &pluginsdk.DBCapability{},
Metrics: &pluginsdk.MetricsCapability{},
// 在平台 UI 中展示的配置参数:
ConfigParams: []pluginsdk.ConfigParam{
{
Key: "timeout_seconds",
Type: pluginsdk.ConfigParamTypeInt,
Default: "30",
Description: "请求超时时间(秒)",
},
},
// 启动时初始化的 RBAC 权限:
Permissions: []pluginsdk.PluginPermissionDecl{
{SubResource: "items", Action: "read", Description: "读取条目"},
{SubResource: "items", Action: "write", Description: "创建或更新条目"},
},
}
}使用 HostAPI
在 Init(ctx, host) 中,调用 host.*() 获取子接口。每个子接口仅在 Manifest() 中声明了相应能力时才为非 nil。
func (p *Plugin) Init(ctx context.Context, host pluginsdk.HostAPI) error {
p.host = host
// 注册 HTTP 路由
if err := p.registerRoutes(); err != nil {
return err
}
// 运行 DB 迁移
if err := host.DB().Migrate(ctx, migrations); err != nil {
return fmt.Errorf("migration failed: %w", err)
}
// 订阅总线主题
host.Bus().Subscribe("evt.myplugin.v1.*", p.handleEvent)
// 注册 Prometheus 指标
p.itemsTotal = host.Metrics().NewCounter("myplugin_items_total", "已处理条目总数")
return nil
}在
Init()中注册路由,不要在Start()中注册。 路由必须在ListenAndServe调用前完成注册。
HTTP 路由
使用 host.HTTP() 注册路由。所有路由自动挂载在 /api/v1/plugins/{plugin-id}/ 下。
func (p *Plugin) registerRoutes() error {
router := p.host.HTTP()
// 受保护路由——调用方需要 "myplugin/items:read" 权限
router.Handle("GET /items/{id}", "items", "read", p.handleGetItem)
// 受保护路由——调用方需要 "myplugin/items:write" 权限
router.Handle("POST /items", "items", "write", p.handleCreateItem)
// 公共路由——无需认证(仅用于 webhook 或健康检查)
router.HandlePublic("GET /health", p.handleHealth)
return nil
}Handler 签名:
func (p *Plugin) handleGetItem(w http.ResponseWriter, r *http.Request) {
// 租户 ID 由 TenantAuthMiddleware 注入到 context 中
tenantID := r.Context().Value(tenantCtxKey).(uuid.UUID)
id := r.PathValue("id")
// ...
}所有受保护路由的中间件栈:
频率限制 → JWT 认证 → 租户认证(X-Tenant-ID 请求头)→ RBAC 检查 → 你的 handler数据库迁移
将迁移声明为 pluginsdk.Migration 切片。平台在 plugin_migrations_{plugin_id} 中跟踪已应用的迁移,并以幂等方式执行。
var migrations = []pluginsdk.Migration{
{
Version: 1,
SQL: `
CREATE TABLE IF NOT EXISTS myplugin_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
ALTER TABLE myplugin_items ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON myplugin_items
USING (tenant_id = current_setting('app.tenant_id', true)::uuid);
`,
},
}在 Init 中执行:
if err := host.DB().Migrate(ctx, migrations); err != nil {
return fmt.Errorf("migrate: %w", err)
}执行查询
对于启用 RLS 的表,始终使用 WithTenant(tenantID):
func (p *Plugin) getItem(ctx context.Context, tenantID, itemID uuid.UUID) error {
return p.host.DB().WithTenant(tenantID).QueryRow(ctx,
`SELECT id, name FROM myplugin_items WHERE id = $1`,
itemID,
).Scan(&item.ID, &item.Name)
}在 RLS 表上调用
host.DB()时,绝不能省略WithTenant()。 省略将绕过租户隔离。
消息总线
// 在 Init 中订阅:
host.Bus().Subscribe("evt.myplugin.v1.*", p.handleEvent)
// 在任意位置发布:
func (p *Plugin) notify(ctx context.Context, deviceID uuid.UUID) error {
return p.host.Bus().Publish(ctx,
fmt.Sprintf("device/%s/cmd.myplugin.v1.notify", deviceID),
map[string]any{"timestamp": time.Now().UTC()},
)
}
// Handler 签名:
func (p *Plugin) handleEvent(ctx context.Context, topic string, payload []byte) {
// 解析载荷、更新状态等
}主题命名约定:
evt.{plugin}.v{N}.{event}— 事件(设备→服务端或服务端内部)cmd.{plugin}.v{N}.{command}— 命令(服务端→设备)device/{id}/...— 按设备主题
Prometheus 指标
// 在 Init 中声明:
p.requestsTotal = host.Metrics().NewCounter(
"myplugin_requests_total",
"myplugin 插件处理的 HTTP 请求总数",
)
p.activeItems = host.Metrics().NewGauge(
"myplugin_active_items",
"当前活跃条目数",
)
p.requestDuration = host.Metrics().NewHistogram(
"myplugin_request_duration_seconds",
"HTTP 请求耗时(秒)",
)
// 使用:
p.requestsTotal.Inc()
p.activeItems.Set(float64(count))
p.requestDuration.Observe(elapsed.Seconds())平台会自动为通过 MetricsRegistry 注册的所有指标添加 plugin_id 标签。
KV 存储
host.Store() KV 存储始终可用(无需在 Manifest 中声明)。它提供由平台存储层支持的简单命名空间键值存储。
// 存储:
if err := p.host.Store().Set(ctx, "last_sync", []byte(time.Now().Format(time.RFC3339))); err != nil {
return fmt.Errorf("store set: %w", err)
}
// 读取:
val, err := p.host.Store().Get(ctx, "last_sync")
if err != nil {
return fmt.Errorf("store get: %w", err)
}存储按插件命名空间隔离——myplugin 中的 "last_sync" 与其他插件中的同名键完全隔离。
插件配置
通过 host.Store() 或在服务层调用 GetPluginConfig 读取按租户的插件配置。配置通过 cfg.plugins.v1.set 总线消息推送到设备代理。
// 读取配置快照(通过 SetPluginConfig RPC 设置):
snapshot, err := p.host.Store().Get(ctx, "config")
if err != nil {
return nil
}
var cfg MyConfig
json.Unmarshal(snapshot, &cfg)RBAC 权限
在 Manifest().Permissions 中声明的权限,由 SDKPluginManager.Init() 在启动时初始化到 permissions 表中。每个权限遵循 "{plugin-id}/{subresource}:{action}" 命名规范。
在前端代码中,使用以下方式控制组件显示:
can(perms, 'myplugin/items', 'read') // 显示条目列表
can(perms, 'myplugin/items', 'write') // 显示创建/编辑按钮默认角色分配:在 PluginPermissionDecl.DefaultRoles 中声明。若未设置,权限默认不授予任何角色,需由 tenant_admin 手动分配。
注册插件
在 cmd/umoo/main.go 中将插件注册到 SDK 注册表:
import myplugin "github.com/autofacts/umoo/internal/backend/plugins/myplugin"
// 在 main() 中:
sdkRegistry.RegisterBuiltin(myplugin.New())就这些。SDKPluginManager 在服务器启动时自动调用 Init → Start。如果插件的 Init 返回错误,启动时记录错误但继续运行——其他插件不受影响。
编写代理端插件
代理端插件运行在设备上,实现 pluginsdk.AgentPlugin:
type AgentPlugin interface {
ID() string
Init(ctx context.Context, host AgentHostAPI) error
Start(ctx context.Context) error
Stop(ctx context.Context) error
Health() HealthReport
}与后端插件的主要区别:
- 无 HTTP 路由或 Prometheus 指标
AgentHostAPI仅提供Bus()、Store()和Logger()- 订阅
cmd.{plugin}.*主题,发布evt.{plugin}.*主题 - 监听
cfg.plugins.v1.set以热重载配置
源码:internal/agent/plugins/。参考 internal/agent/plugins/telemetry/ 作为简单代理端插件的示例。
在 cmd/umoo-agent/main.go 中注册。
测试
单元测试
在同一包中编写单元测试(package myplugin),使用文件内 mock 替代依赖项:
type mockDB struct {
mu sync.RWMutex
items map[uuid.UUID]*Item
}
func (m *mockDB) WithTenant(id uuid.UUID) pluginsdk.DB { return m }
func (m *mockDB) Migrate(ctx context.Context, migrations []pluginsdk.Migration) error { return nil }
// ...集成测试
集成测试位于 tests/e2e/。使用 testutil.SeedTenant() 创建测试租户,使用 setupTestEnv(t) 连接到运行中的服务器。
func TestMyPluginGetItem(t *testing.T) {
requireDockerInfra(t)
env := setupTestEnv(t)
tenant := testutil.SeedTenant(t, env.DB)
// 向运行中的服务器发起请求...
}运行集成测试:
make docker-up
make test-e2e