Smart Fermenter API服务与数据库设计
YZDR Lv2

Smart Fermenter API服务与数据库设计

本文介绍 Smart Fermenter 系统的 API 服务实现和数据库架构设计,涵盖 Flask 路由设计、数据验证、连接池管理等核心内容。

API 架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
┌──────────────────────────────────────────────────────────────┐
│ API 请求流程 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 客户端请求 ──▶ IP白名单检查 ──▶ 请求限流 ──▶ 参数验证 │
│ │ │
│ ▼ │
│ 业务处理 ──▶ 数据库操作 │
│ │ │
│ ▼ │
│ 响应包装 ──▶ JSON返回 │
│ │
└──────────────────────────────────────────────────────────────┘

Flask 应用工厂

1
2
3
4
5
6
def create_app(config_path: Optional[str] = None) -> Flask:
config = APIConfig(config_path)
setup_logging(config)

app = Flask(__name__)
app.config["JSON_AS_ASCII"] = False

配置管理(单例模式)

1
2
3
4
5
6
7
8
9
10
class APIConfig:
_instance = None
_lock = threading.Lock()

def __new__(cls, config_path: Optional[str] = None):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

配置文件示例(api_config.json):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"server": {
"host": "0.0.0.0",
"port": 5000,
"debug": false,
"threaded": true
},
"security": {
"ip_whitelist_enabled": false,
"ip_whitelist": ["192.168.1.0/24"]
},
"validation": {
"raw_data": {
"required_fields": ["timestamp", "batch_id"]
}
},
"performance": {
"max_request_size": 1048576,
"request_timeout": 30,
"max_batch_size": 1000
}
}

核心 API 端点

1. 健康检查

1
2
3
4
5
6
7
8
9
@app.route("/api/health", methods=["GET"])
def health_check():
return jsonify({
"success": True,
"data": {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat() + "Z"
}
}), 200

2. 系统状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route("/api/status", methods=["GET"])
def status():
db_connector = DatabaseConnector()
db_health = db_connector.health_check_all()
raw_data_count = raw_data_ops.count()

return jsonify({
"success": True,
"data": {
"status": "running",
"timestamp": datetime.utcnow().isoformat() + "Z",
"database": db_health,
"statistics": {"raw_data_count": raw_data_count}
}
}), 200

3. 提交传感器数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@app.route("/api/raw_data", methods=["POST"])
def create_raw_data():
if not request.is_json:
return jsonify({"success": False, "error": {...}}), 400

data = request.get_json()

# 批量插入
if isinstance(data, list):
max_batch = config.get("performance.max_batch_size", 1000)
if len(data) > max_batch:
return jsonify({"success": False, "error": {...}}), 400

# 验证所有记录
errors = []
for i, record in enumerate(data):
is_valid, error_msg = data_validator.validate_raw_data(record)
if not is_valid:
errors.append(f"Record {i}: {error_msg}")

# 批量插入数据库
normalized_records = [data_validator.normalize_data(r) for r in data]
inserted_count = raw_data_ops.batch_insert(normalized_records)

return jsonify({
"success": True,
"data": {"inserted_count": inserted_count}
}), 201

4. 获取最新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
@app.route("/api/raw_data/latest", methods=["GET"])
def get_latest_raw_data():
batch_id = request.args.get("batch_id")
record = raw_data_ops.get_latest(batch_id)

if record is None:
return jsonify({"success": True, "data": None}), 200

# 转换 datetime 为 ISO 格式
if "timestamp" in record and isinstance(record["timestamp"], datetime):
record["timestamp"] = record["timestamp"].isoformat()

return jsonify({"success": True, "data": record}), 200

数据验证

字段验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class DataValidator:
def validate_raw_data(self, data: Dict[str, Any]) -> tuple[bool, Optional[str]]:
required_fields = ["timestamp", "batch_id"]
for field in required_fields:
if field not in data or data[field] is None:
return False, f"Required field '{field}' is missing"

# 字段类型检查
field_types = self.validation_rules.get("field_types", {})
for field, expected_type in field_types.items():
if expected_type == "datetime" and not self._is_valid_datetime(value):
return False, f"Field '{field}' must be a valid datetime"

# 数值范围检查
field_ranges = {"od_600": {"min": 0, "max": 200}}
for field, range_config in field_ranges.items():
num_value = float(data[field])
if num_value < range_config["min"] or num_value > range_config["max"]:
return False, f"Field '{field}' exceeds valid range"

数据规范化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def normalize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
normalized = {}
for key, value in data.items():
if value is None:
normalized[key] = None
continue

# 类型转换
if expected_type == "datetime" and isinstance(value, str):
normalized[key] = datetime.fromisoformat(value.replace("Z", "+00:00"))
elif expected_type == "float":
normalized[key] = float(value)
else:
normalized[key] = value

return normalized

数据库连接池

连接池原理

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────┐
│ Connection Pool │
├─────────────────────────────────────────────┤
│ │
│ Thread 1 ──▶ get_connection() ──▶ Conn 1 │
│ Thread 2 ──▶ get_connection() ──▶ Conn 2 │
│ Thread 3 ──▶ get_connection() ──▶ Conn 3 │
│ Thread 4 ──▶ get_connection() ──▶ ... │
│ │
│ [Conn 1] [Conn 2] [Conn 3] ... [Conn N] │
│ │
└─────────────────────────────────────────────┘

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ConnectionPool:
def __init__(self, db_config, pool_size=5, timeout=30.0, driver="pymysql"):
self._pool: Queue = Queue(maxsize=pool_size)
self._lock = threading.Lock()

# 预创建连接
for _ in range(pool_size):
conn = self._create_connection()
self._pool.put(conn)

def get_connection(self, timeout=None):
try:
conn = self._pool.get(block=True, timeout=timeout)
# 检查连接是否存活
if not self._is_connection_alive(conn):
conn = self._create_connection()
return conn
except Empty:
# 连接池耗尽时创建新连接
return self._create_connection()

def release_connection(self, conn):
if self._is_connection_alive(conn):
self._pool.put(conn, block=False)
else:
conn.close()

健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def health_check(self) -> Dict[str, Any]:
alive_count = 0
dead_count = 0
temp_connections = []

while not self._pool.empty():
conn = self._pool.get_nowait()
if self._is_connection_alive(conn):
alive_count += 1
temp_connections.append(conn)
else:
dead_count += 1
conn.close()

# 归还连接
for conn in temp_connections:
self._pool.put(conn)

return {
"pool_size": self.pool_size,
"alive_connections": alive_count,
"dead_connections": dead_count
}

双数据库设计

1
2
3
4
5
6
7
8
9
10
┌─────────────────────┐     ┌─────────────────────┐
│ fermenter_raw_data │ │fermenter_predictions│
├─────────────────────┤ ├─────────────────────┤
│ timestamp │ │ timestamp │
│ batch_id │ │ batch_id │
│ dm_air │ │ predicted_od_600 │
│ m_ls_opt_do │ │ confidence │
│ m_ph │ │ model_version │
│ ... │ │ ... │
└─────────────────────┘ └─────────────────────┘

安全特性

IP 白名单

1
2
3
4
5
6
7
8
9
10
11
12
13
class IPWhitelist:
def is_allowed(self, client_ip: str) -> bool:
if not self.enabled:
return True

ip = ipaddress.ip_address(client_ip)
for item in self.whitelist:
if isinstance(item, ipaddress.IPv4Network):
if ip in item:
return True
elif ip == item:
return True
return False

请求限流

1
2
3
4
5
6
7
8
9
10
@app.after_request
def after_request(response):
elapsed_ms = (time.time() - g.start_time) * 1000
response.headers["X-Response-Time-ms"] = f"{elapsed_ms:.2f}"

# 慢请求警告
if elapsed_ms > 500:
logging.warning(f"Slow request: {request.method} {request.path}")

return response

响应头安全

1
2
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"

启动 API 服务

1
2
3
4
5
6
7
8
# 基本启动
python main.py api

# 指定端口
python main.py api --port 8080

# 指定主机(允许远程访问)
python main.py api --host 0.0.0.0 --port 5000

客户端调用示例

提交单条数据

1
2
3
4
5
6
7
8
9
curl -X POST http://localhost:5000/api/raw_data \
-H "Content-Type: application/json" \
-d '{
"timestamp": "2026-05-07T10:00:00Z",
"batch_id": "B001",
"dm_air": 1.5,
"m_ls_opt_do": 45.2,
"m_ph": 7.1
}'

批量提交

1
2
3
4
5
6
curl -X POST http://localhost:5000/api/raw_data \
-H "Content-Type: application/json" \
-d '[
{"timestamp": "2026-05-07T10:00:00Z", "batch_id": "B001", "dm_air": 1.5},
{"timestamp": "2026-05-07T10:05:00Z", "batch_id": "B001", "dm_air": 1.6}
]'

总结

API 服务层采用 Flask 框架实现,提供标准 RESTful 接口。数据库层通过连接池技术实现高效访问,双数据库设计分离原始数据存储和预测结果存储。完善的安全机制包括 IP 白名单、数据验证、请求限流等。

下篇博客将介绍 Smart Fermenter 的未来应用与扩展方向

相关阅读

 评论
评论插件加载失败
正在加载评论插件