weather-forecast-v2:入门数据型扩展
案例背景
weather-forecast-v2 是 NeoMind 生态中最简单的「数据型扩展」——它定时从 Open-Meteo API 拉取天气数据,将温度、湿度、风速等指标写入 NeoMind 指标系统。
同时提供一个 React 卡片组件用于仪表板展示。整个扩展约 700 行 Rust + 560 行 TypeScript,没有任何 AI 推理、流式处理或协议桥接逻辑,是新手理解「一个扩展由哪些部分组成」的最短路径。
它解决了什么问题? NeoMind 仪表板需要展示实时环境数据(温度、湿度、风速),但这些数据来自外部 HTTP API 而非本地设备。weather-forecast-v2 充当「数据代理」——把外部 API 的数据拉取到 NeoMind 指标系统,使仪表板组件可以像消费任何本地设备指标一样消费天气数据。
目标读者:刚读完 扩展 API 通用参考、想动手写第一个扩展的开发者。不需要 Rust 高级知识,但要理解 trait、async/await、模块系统等基础概念。
在生态中的位置:weather-forecast-v2 是「数据型扩展」的范本——它不依赖任何硬件设备、不涉及 AI 模型、不桥接工业协议。后续案例中,#2(yolo-device-inference)会在它的基础上增加模型加载,#4(onvif-bridge)会增加协议栈复杂度。掌握了本案例的 8 节内容,你就掌握了所有数据型扩展的骨架。
你将学到:
- 如何用
ExtensionMetadata::new()构建器声明扩展元数据 - 如何用
metric_float!/ExtensionMetricValue产出周期指标 - 为什么在动态库场景下选择同步 HTTP 客户端(
ureq)而非reqwest - 如何把 React 组件通过 Vite UMD 包暴露给 NeoMind 仪表板加载器
架构总览
weather-forecast-v2 由三部分组成:Rust 扩展核心(数据拉取 + 指标产出)、React 前端组件(卡片 UI)、NeoMind 运行时(加载 + 调度)。下图展示了数据流向和进程边界。
4 个核心抽象
| 抽象 | 位置 | 作用 |
|---|---|---|
Extension trait | SDK | 扩展必须实现的接口:metadata() / metrics() / commands() / execute_command() / produce_metrics() / configure() |
ExtensionMetricValue | SDK | 一个指标数据点 = { name, value, timestamp },produce_metrics() 返回 Vec<ExtensionMetricValue> |
ExtensionMetadata builder | SDK | 链式构建器:new(id, name, version).with_description(...).with_config_parameters(...) |
neomind_export! 宏 | SDK | 生成 C FFI 导出函数,让宿主进程能通过 C ABI 加载 Rust cdylib |
实现剖析
目录结构
extensions/weather-forecast-v2/
├── Cargo.toml # 依赖声明 + crate 元信息
├── metadata.json # 扩展清单(构建脚本从 Cargo.toml 自动生成)
├── src/
│ └── lib.rs # 唯一的 Rust 源文件(~580 行,含测试)
├── frontend/
│ ├── package.json # Vite + React 18 + TypeScript
│ ├── vite.config.ts # UMD 打包配置(externalize React)
│ └── src/
│ └── index.tsx # WeatherCard 组件(~560 行)
└── tests/
└── extension_test.rs
单文件 Rust 设计是故意的——weather-forecast-v2 的逻辑复杂度不足以拆分模块。后续案例(如 yolo-device-inference)会拆出 onnx_utils.rs / camera.rs 等子模块。
ExtensionMetadata 构建器链
查看完整实现:src/lib.rs L219-273
fn metadata(&self) -> &ExtensionMetadata {
// why static: metadata 是只读的,全进程只需一份实例
static META: std::sync::OnceLock<ExtensionMetadata> = std::sync::OnceLock::new();
META.get_or_init(|| {
ExtensionMetadata::new(
"weather-forecast-v2", // why kebab-case: 与目录名、metadata.json id 一致
"Weather Forecast V2",
"2.0.0"
)
.with_description("Weather forecast extension for the NeoMind isolated runtime using a sync HTTP client")
.with_author("NeoMind Team")
.with_config_parameters(vec![
ParameterDefinition {
name: "defaultCity".to_string(),
display_name: "Default City".to_string(),
// why options enum: 前端下拉框直接消费,不需要额外枚举定义
options: vec!["Beijing", "Shanghai", "New York", "London", "Tokyo"]
.into_iter().map(String::from).collect(),
..Default::default()
},
// ... refreshInterval / unit 参数省略,见源码 L247-270
])
})
}
// lib.rs L219-L273 (trimmed)
fn metadata(&self) -> &ExtensionMetadata {
static META: std::sync::OnceLock<ExtensionMetadata> = std::sync::OnceLock::new();
META.get_or_init(|| {
ExtensionMetadata::new(
"weather-forecast-v2",
"Weather Forecast V2",
"2.0.0"
)
.with_description("Weather forecast extension for the NeoMind isolated runtime using a sync HTTP client")
.with_author("NeoMind Team")
.with_config_parameters(vec![
ParameterDefinition {
name: "defaultCity".to_string(),
display_name: "Default City".to_string(),
description: "Default city for weather display".to_string(),
param_type: MetricDataType::String,
required: false,
default_value: Some(ParamMetricValue::String("Beijing".to_string())),
min: None,
max: None,
options: vec![
"Beijing".to_string(),
"Shanghai".to_string(),
"New York".to_string(),
"London".to_string(),
"Tokyo".to_string(),
],
},
ParameterDefinition {
name: "refreshInterval".to_string(),
为什么用 OnceLock 而非 lazy_static!? OnceLock 是 Rust 1.70 标准库引入的,不需要额外依赖。metadata 在第一次调用 metadata() 时构建,之后所有调用返回同一引用——这对 FFI 边界很重要,因为宿主进程会频繁查询 metadata。
指标产出:AtomicI64 + 定点小数
查看完整实现:src/lib.rs L80-92(字段定义)、L119-129(存储)、L466-522(产出)
pub struct WeatherExtension {
// why RwLock for city: 城市名需要读写(configure 改写),但读远多于写
default_city: std::sync::RwLock<String>,
// why AtomicI64 for metrics: 指标值需要高频读(produce_metrics)+ 高频写(store_weather_metrics)
// why定点存储 ×100: AtomicI64 不支持 f64,用 ×100 保留两位小数精度
last_temperature_c: AtomicI64, // 实际值 = stored / 100.0
last_humidity_percent: AtomicI64, // 整数,不需要缩放
request_count: AtomicI64,
has_data: AtomicBool, // why flag: 首次请求前 has_data=false,produce_metrics 只返回 request_count
}
// 存储时乘 100 转定点
fn store_weather_metrics(&self, weather: &WeatherResult) {
self.last_temperature_c.store(
(weather.temperature_c * 100.0) as i64, Ordering::SeqCst);
// ... 其他字段类似
}
// 产出时除 100 还原浮点
fn produce_metrics(&self) -> Result<Vec<ExtensionMetricValue>> {
let mut metrics = Vec::with_capacity(9);
metrics.push(ExtensionMetricValue {
name: "temperature_c".to_string(),
value: ParamMetricValue::Float(
self.last_temperature_c.load(Ordering::SeqCst) as f64 / 100.0),
timestamp: now,
});
// ... has_data=false 时只返回 request_count
Ok(metrics)
}
// lib.rs L80-L92
pub struct WeatherExtension {
default_city: std::sync::RwLock<String>,
request_count: AtomicI64,
last_temperature_c: AtomicI64,
last_feels_like_c: AtomicI64,
last_humidity_percent: AtomicI64,
last_wind_speed_kmph: AtomicI64,
last_wind_direction_deg: AtomicI64,
last_cloud_cover_percent: AtomicI64,
last_pressure_hpa: AtomicI64,
last_update_ts: AtomicI64,
has_data: AtomicBool,
}
// lib.rs L119-L129
fn store_weather_metrics(&self, weather: &WeatherResult) {
self.last_temperature_c.store((weather.temperature_c * 100.0) as i64, Ordering::SeqCst);
self.last_feels_like_c.store((weather.feels_like_c * 100.0) as i64, Ordering::SeqCst);
self.last_humidity_percent.store(weather.humidity_percent as i64, Ordering::SeqCst);
self.last_wind_speed_kmph.store((weather.wind_speed_kmph * 100.0) as i64, Ordering::SeqCst);
self.last_wind_direction_deg.store(weather.wind_direction_deg as i64, Ordering::SeqCst);
self.last_cloud_cover_percent.store(weather.cloud_cover_percent as i64, Ordering::SeqCst);
self.last_pressure_hpa.store((weather.pressure_hpa * 100.0) as i64, Ordering::SeqCst);
self.last_update_ts.store(chrono::Utc::now().timestamp_millis(), Ordering::SeqCst);
self.has_data.store(true, Ordering::SeqCst);
}
// lib.rs L466-L522 (trimmed)
fn produce_metrics(&self) -> Result<Vec<ExtensionMetricValue>> {
let now = chrono::Utc::now().timestamp_millis();
let mut metrics = Vec::with_capacity(9);
metrics.push(ExtensionMetricValue {
name: "request_count".to_string(),
value: ParamMetricValue::Integer(self.request_count.load(Ordering::SeqCst)),
timestamp: now,
});
if self.has_data.load(Ordering::SeqCst) {
metrics.extend(vec![
ExtensionMetricValue {
name: "temperature_c".to_string(),
value: ParamMetricValue::Float(self.last_temperature_c.load(Ordering::SeqCst) as f64 / 100.0),
timestamp: now,
},
ExtensionMetricValue {
name: "feels_like_c".to_string(),
value: ParamMetricValue::Float(self.last_feels_like_c.load(Ordering::SeqCst) as f64 / 100.0),
timestamp: now,
},
ExtensionMetricValue {
name: "humidity_percent".to_string(),
value: ParamMetricValue::Integer(self.last_humidity_percent.load(Ordering::SeqCst)),
timestamp: now,
},
ExtensionMetricValue {
name: "wind_speed_kmph".to_string(),
value: ParamMetricValue::Float(self.last_wind_speed_kmph.load(Ordering::SeqCst) as f64 / 100.0),
timestamp: now,
},
ExtensionMetricValue {
name: "wind_direction_deg".to_string(),
为什么不用 Mutex<WeatherResult>? 因为 produce_metrics() 是同步方法,会被运行时周期性高频调用。如果用 Mutex,每次读取都要获取锁,在高并发指标采集场景下会造成锁争用。AtomicI64 的 load 是无锁操作,性能开销最小。
HTTP 客户端:ureq 同步
查看完整实现:src/lib.rs L132-167
fn geocode_sync(&self, city: &str) -> std::result::Result<GeoLocation, String> {
let encoded_city = urlencoding::encode(city);
let url = format!(
"https://geocoding-api.open-meteo.com/v1/search?name={}&count=1&language=en&format=json",
encoded_city
);
// why ureq not reqwest: cdylib 内部不能启动 Tokio runtime(会与宿主冲突)
let response: serde_json::Value = ureq::get(&url)
.timeout(std::time::Duration::from_secs(30))
.call()
.map_err(|e| format!("HTTP error: {}", e))?
.into_json()
.map_err(|e| format!("JSON error: {}", e))?;
// ...
}
// lib.rs L132-L167 (trimmed)
fn get_weather_sync(&self, city: &str) -> Result<WeatherResult> {
self.request_count.fetch_add(1, Ordering::SeqCst);
let location = self.geocode_sync(city)
.map_err(|e| ExtensionError::ExecutionFailed(e))?;
let mut weather = self.fetch_weather_sync(&location)
.map_err(|e| ExtensionError::ExecutionFailed(e))?;
weather.timestamp = Some(chrono::Utc::now().to_rfc3339());
self.store_weather_metrics(&weather);
Ok(weather)
}
fn geocode_sync(&self, city: &str) -> std::result::Result<GeoLocation, String> {
let encoded_city = urlencoding::encode(city);
let url = format!(
"https://geocoding-api.open-meteo.com/v1/search?name={}&count=1&language=en&format=json",
encoded_city
);
let response: serde_json::Value = ureq::get(&url)
.timeout(std::time::Duration::from_secs(30))
.call()
Cargo.toml 第 21 行有明确注释:# Use sync HTTP client to avoid Tokio runtime issues in dynamic libraries。这是整个案例最关键的设计决策,详见 4.1。
命令流时序:get_weather 端到端调用链
下方的时序图展示了 get_weather 命令从运行时调度到指标产出的完整调用链,标注了缓存读取(RwLock)、两次外部 HTTP 请求(Geocoding → Forecast)、以及定点数编码(×100 / ÷100)发生的时机。这是贡献者修改本扩展时需要理解的最小行为单元——尤其是缓存命中/未命中的分支位置和定点编码的边界。
前端入口:Vite UMD 包
查看完整实现:frontend/src/index.tsx L425-565
WeatherCard 组件通过 forwardRef 暴露,Vite 构建为 UMD 包(weather-forecast-v2-components.umd.cjs)。metadata.json 的 frontend 字段声明入口:
{
"frontend": {
"components": ["WeatherCard"],
"entrypoint": "weather-forecast-v2-components.umd.cjs"
}
}
前端通过 fetch('/api/extensions/weather-forecast-v2/command') 调用后端命令,带 3 次重试和初始化错误检测——这保证了扩展加载延迟时 UI 不会直接报错。
设计权衡
同步 HTTP 客户端(ureq)vs 异步(reqwest)
决策:使用 ureq 2.x(同步阻塞 HTTP 客户端)。
被否决的方案:
- A.
reqwest异步客户端 → 否决原因:reqwest依赖 Tokio runtime,而 NeoMind 扩展以.dylib/.so/.dll形式被宿主进程动态加载。在动态库内部启动 Tokio runtime 会与宿主进程的运行时产生冲突(宿主可能已经有自己的 runtime),导致 panic 或 undefined behavior。Cargo.toml第 21 行的注释明确记录了这一点。 - B. 原始
hyper+ 手动线程池 → 否决原因:过于底层,HTTP/JSON 解析、连接池、超时控制都需要手写,维护成本远超收益。weather-forecast-v2 的 HTTP 调用量极小(每次命令一次 geocoding + 一次 weather fetch),不需要异步并发。
权衡代价:同步调用会阻塞调用线程。execute_command 是 async 方法,但内部的 get_weather_sync 是同步阻塞的——调用期间该 task 无法让出控制权。