@app.route("/api/raw_data", methods=["POST"]) defcreate_raw_data(): ifnot request.is_json: return jsonify({"success": False, "error": {...}}), 400 data = request.get_json() # 批量插入 ifisinstance(data, list): max_batch = config.get("performance.max_batch_size", 1000) iflen(data) > max_batch: return jsonify({"success": False, "error": {...}}), 400 # 验证所有记录 errors = [] for i, record inenumerate(data): is_valid, error_msg = data_validator.validate_raw_data(record) ifnot is_valid: errors.append(f"Record {i}: {error_msg}") # 批量插入数据库 normalized_records = [data_validator.normalize_data(r) for r in data] inserted_count = raw_data_ops.batch_insert(normalized_records) return jsonify({ "success": True, "data": {"inserted_count": inserted_count} }), 201
4. 获取最新数据
1 2 3 4 5 6 7 8 9 10 11 12 13
@app.route("/api/raw_data/latest", methods=["GET"]) defget_latest_raw_data(): batch_id = request.args.get("batch_id") record = raw_data_ops.get_latest(batch_id) if record isNone: return jsonify({"success": True, "data": None}), 200 # 转换 datetime 为 ISO 格式 if"timestamp"in record andisinstance(record["timestamp"], datetime): record["timestamp"] = record["timestamp"].isoformat() return jsonify({"success": True, "data": record}), 200
数据验证
字段验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
classDataValidator: defvalidate_raw_data(self, data: Dict[str, Any]) -> tuple[bool, Optional[str]]: required_fields = ["timestamp", "batch_id"] for field in required_fields: if field notin data or data[field] isNone: returnFalse, f"Required field '{field}' is missing" # 字段类型检查 field_types = self.validation_rules.get("field_types", {}) for field, expected_type in field_types.items(): if expected_type == "datetime"andnotself._is_valid_datetime(value): returnFalse, f"Field '{field}' must be a valid datetime" # 数值范围检查 field_ranges = {"od_600": {"min": 0, "max": 200}} for field, range_config in field_ranges.items(): num_value = float(data[field]) if num_value < range_config["min"] or num_value > range_config["max"]: returnFalse, f"Field '{field}' exceeds valid range"
数据规范化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
defnormalize_data(self, data: Dict[str, Any]) -> Dict[str, Any]: normalized = {} for key, value in data.items(): if value isNone: normalized[key] = None continue # 类型转换 if expected_type == "datetime"andisinstance(value, str): normalized[key] = datetime.fromisoformat(value.replace("Z", "+00:00")) elif expected_type == "float": normalized[key] = float(value) else: normalized[key] = value return normalized
classIPWhitelist: defis_allowed(self, client_ip: str) -> bool: ifnotself.enabled: returnTrue ip = ipaddress.ip_address(client_ip) for item inself.whitelist: ifisinstance(item, ipaddress.IPv4Network): if ip in item: returnTrue elif ip == item: returnTrue returnFalse