本节学习目标
通过本节学习,你将能够:
- 理解面向对象编程(OOP)在数据分析中的应用价值
- 学会使用类(Class)封装数据处理逻辑
- 掌握数据处理管道(Pipeline)的构建方法
- 学会使用日志(Logging)记录程序运行状态
- 掌握异常处理(Try-Except)的最佳实践
- 理解配置文件管理,让代码更加灵活可维护
为什么学这个?
回顾一下你之前写的代码。是不是经常遇到这样的情况:
# 项目1:分析销售数据
df = pd.read_csv("sales.csv")
df = df.dropna()
df["日期"] = pd.to_datetime(df["日期"])
df = df[df["金额"] > 0]
# ... 分析代码 ...
# 项目2:分析用户数据
df = pd.read_csv("users.csv")
df = df.dropna()
df["注册时间"] = pd.to_datetime(df["注册时间"])
df = df[df["年龄"] > 0]
# ... 又是类似的分析代码 ...
# 项目3:分析产品数据
df = pd.read_csv("products.csv")
df = df.dropna()
df["上架日期"] = pd.to_datetime(df["上架日期"])
df = df[df["价格"] > 0]
# ... 又来一遍 ...
同样的清洗逻辑,写了一遍又一遍。每次改一个地方,其他所有项目都要改。这就像你有5个手机,每个都要单独充电,而不是用一根线充所有手机。
好的代码应该是可复用的。 本节要教你的,就是把"每次写一遍"变成"写一次,到处用"。
打个比方:
- 之前的你:每次做饭都从种菜开始
- 学完这节后:你有了一个"厨房",食材拿过来就能直接做
核心知识点讲解
面向对象编程(OOP)基础
为什么要在数据分析中用类?
类(Class)就是把相关的数据和操作打包在一起的容器。
# 不用类的方式:数据和操作分开
df = pd.read_csv("data.csv")
# 一堆处理代码...
# 再一堆处理代码...
# 变量到处飞,很难追踪
# 用类的方式:数据和操作在一起
processor = DataProcessor("data.csv")
processor.load()
processor.clean()
processor.analyze()
# 逻辑清晰,一目了然
类的基本概念
import pandas as pd
import numpy as np
class SimpleAnalyzer:
"""一个简单的数据分析器类"""
# __init__ 是初始化方法,创建对象时自动调用
def __init__(self, name="默认分析器"):
self.name = name # 属性:对象的数据
self.data = None # 属性:存放数据
self.report = None # 属性:存放分析结果
print(f"分析器 '{self.name}' 已创建")
# 方法:对象能做什么
def load_data(self, filepath):
"""加载数据"""
self.data = pd.read_csv(filepath)
print(f"已加载 {len(self.data)} 行数据")
return self.data
def summary(self):
"""生成数据摘要"""
if self.data is None:
return "请先加载数据"
result = {
"行数": len(self.data),
"列数": len(self.data.columns),
"缺失值": int(self.data.isnull().sum().sum()),
"重复行": int(self.data.duplicated().sum())
}
self.report = result
return result
def __str__(self):
"""定义打印对象时的显示"""
return f"分析器: {self.name}, 数据: {len(self.data) if self.data is not None else '未加载'}行"
# 使用示例
# analyzer = SimpleAnalyzer("销售分析器")
# analyzer.load_data("sales.csv")
# print(analyzer.summary())
# print(analyzer)
构建数据处理类
通用的数据清洗器
import logging
from typing import List, Dict, Optional, Union
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
class DataCleaner:
"""
通用数据清洗器
功能: 加载、清洗、转换、保存数据
"""
def __init__(self, data: Optional[pd.DataFrame] = None, name: str = "Cleaner"):
self.name = name
self.data = data
self.history = [] # 记录每一步操作
self._log = logging.getLogger(self.name)
def log_action(self, action: str, details: str = ""):
"""记录操作到历史日志"""
entry = {"action": action, "details": details, "rows": len(self.data) if self.data is not None else 0}
self.history.append(entry)
self._log.info(f"[{action}] {details}")
def load_csv(self, filepath: str, **kwargs) -> "DataCleaner":
"""从CSV加载数据"""
try:
self.data = pd.read_csv(filepath, **kwargs)
self.log_action("加载", f"从 {filepath} 加载了 {len(self.data)} 行数据")
except FileNotFoundError:
self._log.error(f"文件不存在: {filepath}")
raise
except Exception as e:
self._log.error(f"加载失败: {e}")
raise
return self # 返回self支持链式调用
def load_dict(self, data_dict: Dict) -> "DataCleaner":
"""从字典创建数据"""
self.data = pd.DataFrame(data_dict)
self.log_action("创建", f"从字典创建了 {len(self.data)} 行数据")
return self
def remove_duplicates(self, subset: Optional[List[str]] = None) -> "DataCleaner":
"""删除重复行"""
before = len(self.data)
self.data = self.data.drop_duplicates(subset=subset)
removed = before - len(self.data)
self.log_action("去重", f"删除了 {removed} 行重复数据")
return self
def fill_missing(self, strategy: str = "mean", columns: Optional[List[str]] = None) -> "DataCleaner":
"""
填充缺失值
参数:
strategy: 填充策略 ("mean", "median", "mode", "zero", "forward")
columns: 指定列,不指定则对所有数值列操作
"""
cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
filled_count = 0
for col in cols:
if col not in self.data.columns:
continue
missing_before = self.data[col].isnull().sum()
if strategy == "mean":
self.data[col] = self.data[col].fillna(self.data[col].mean())
elif strategy == "median":
self.data[col] = self.data[col].fillna(self.data[col].median())
elif strategy == "mode":
self.data[col] = self.data[col].fillna(self.data[col].mode()[0])
elif strategy == "zero":
self.data[col] = self.data[col].fillna(0)
elif strategy == "forward":
self.data[col] = self.data[col].ffill()
filled_count += missing_before
self.log_action("填充缺失值", f"策略={strategy}, 填充了 {filled_count} 个缺失值")
return self
def convert_types(self, type_map: Dict[str, str]) -> "DataCleaner":
"""
转换列数据类型
参数:
type_map: {"列名": "目标类型"},如 {"日期": "datetime", "金额": "float"}
"""
for col, target_type in type_map.items():
if col not in self.data.columns:
self._log.warning(f"列不存在: {col}")
continue
try:
if target_type == "datetime":
self.data[col] = pd.to_datetime(self.data[col])
elif target_type == "int":
self.data[col] = pd.to_numeric(self.data[col]).astype("Int64")
elif target_type == "float":
self.data[col] = pd.to_numeric(self.data[col]).astype(float)
elif target_type == "str":
self.data[col] = self.data[col].astype(str)
elif target_type == "category":
self.data[col] = self.data[col].astype("category")
self.log_action("类型转换", f"{col} -> {target_type}")
except Exception as e:
self._log.error(f"类型转换失败 {col}: {e}")
return self
def filter_rows(self, condition) -> "DataCleaner":
"""
按条件过滤行
参数:
condition: 布尔条件,如 df["金额"] > 0
"""
before = len(self.data)
self.data = self.data[condition]
removed = before - len(self.data)
self.log_action("过滤", f"删除了 {removed} 行,剩余 {len(self.data)} 行")
return self
def rename_columns(self, name_map: Dict[str, str]) -> "DataCleaner":
"""重命名列"""
self.data = self.data.rename(columns=name_map)
self.log_action("重命名", f"重命名了 {len(name_map)} 列")
return self
def get_summary(self) -> pd.DataFrame:
"""获取数据摘要统计"""
if self.data is None:
return pd.DataFrame()
summary = pd.DataFrame({
"类型": self.data.dtypes,
"非空数": self.data.count(),
"缺失数": self.data.isnull().sum(),
"缺失率": self.data.isnull().sum() / len(self.data),
"唯一值数": self.data.nunique()
})
# 数值列额外统计
num_cols = self.data.select_dtypes(include=[np.number]).columns
if len(num_cols) > 0:
stats = self.data[num_cols].describe().T
stats = stats.rename(columns={
"mean": "均值", "std": "标准差", "min": "最小值",
"25%": "25分位", "50%": "中位数", "75%": "75分位", "max": "最大值"
})
summary = summary.join(stats)
return summary
def save_csv(self, filepath: str) -> "DataCleaner":
"""保存为CSV"""
self.data.to_csv(filepath, index=False)
self.log_action("保存", f"已保存至 {filepath}")
return self
def print_history(self):
"""打印操作历史"""
print("\n=== 操作历史 ===")
for i, entry in enumerate(self.history, 1):
print(f" {i}. [{entry['action']}] {entry['details']} (当前: {entry['rows']}行)")
使用数据清洗器
# 模拟数据
sample_data = {
"日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-03"],
"产品": ["A", "B", None, "A", "A"],
"销售额": [100, 200, 150, None, 100],
"数量": [10, 20, 15, 8, 10],
"单价": [10.0, 10.0, 10.0, 12.5, 10.0],
}
# 链式调用演示
cleaner = DataCleaner(name="销售数据清洗")
(
cleaner
.load_dict(sample_data)
.remove_duplicates()
.fill_missing(strategy="mean", columns=["销售额"])
.fill_missing(strategy="mode", columns=["产品"])
.convert_types({"日期": "datetime"})
.filter_rows(lambda df: df["销售额"] > 0)
)
print("\n=== 清洗后的数据 ===")
print(cleaner.data)
print("\n=== 数据摘要 ===")
print(cleaner.get_summary())
cleaner.print_history()
Pipeline(管道)构建
什么是Pipeline?
Pipeline就像一个工厂流水线:
原料 -> 清洗 -> 切割 -> 组装 -> 包装 -> 成品
1 2 3 4 5 6
每一步只做好一件事,然后把结果传给下一步。这样的好处是:
- 清晰:每一步做什么一目了然
- 可复用:可以替换其中的某一步
- 可维护:出问题能快速定位到哪一步
实现分析Pipeline
class AnalysisPipeline:
"""
数据分析管道
将数据加载、清洗、分析、报告串联起来
"""
def __init__(self, name="分析管道"):
self.name = name
self.steps = [] # 管道步骤列表
self.data = None # 当前数据
self.results = {} # 各步骤的结果
self._log = logging.getLogger(name)
def add_step(self, name: str, func, **kwargs):
"""添加一个处理步骤"""
self.steps.append({"name": name, "func": func, "kwargs": kwargs})
self._log.info(f"添加步骤: {name}")
return self
def run(self, data: pd.DataFrame) -> pd.DataFrame:
"""运行整个管道"""
self.data = data.copy()
self._log.info(f"开始运行管道 '{self.name}', 输入数据: {len(self.data)} 行")
for step in self.steps:
try:
self._log.info(f"执行步骤: {step['name']}")
self.data = step["func"](self.data, **step["kwargs"])
self.results[step["name"]] = {
"rows": len(self.data),
"columns": len(self.data.columns)
}
self._log.info(f" -> 完成: {len(self.data)} 行, {len(self.data.columns)} 列")
except Exception as e:
self._log.error(f"步骤 '{step['name']}' 执行失败: {e}")
raise
self._log.info(f"管道运行完成!")
return self.data
def get_report(self) -> str:
"""生成管道运行报告"""
report = [f"\n=== 管道运行报告: {self.name} ==="]
for name, info in self.results.items():
report.append(f" {name}: {info['rows']} 行, {info['columns']} 列")
return "\n".join(report)
# ========== 使用示例 ==========
# 定义各个步骤的处理函数
def step_remove_duplicates(df, **kwargs):
before = len(df)
df = df.drop_duplicates()
logging.info(f" 去重: {before} -> {len(df)} 行")
return df
def step_fill_missing(df, strategy="mean", columns=None, **kwargs):
cols = columns or df.select_dtypes(include=[np.number]).columns.tolist()
for col in cols:
if col in df.columns:
if strategy == "mean":
df[col] = df[col].fillna(df[col].mean())
elif strategy == "median":
df[col] = df[col].fillna(df[col].median())
logging.info(f" 填充缺失值: 策略={strategy}")
return df
def step_convert_dates(df, columns=None, **kwargs):
cols = columns or []
for col in cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col])
logging.info(f" 日期转换: {len(cols)} 列")
return df
def step_add_features(df, **kwargs):
"""添加衍生特征"""
num_cols = df.select_dtypes(include=[np.number]).columns
for col in num_cols:
df[f"{col}_标准化"] = (df[col] - df[col].mean()) / df[col].std()
logging.info(f" 添加特征: {len(num_cols)} 个标准化列")
return df
def step_filter_valid(df, **kwargs):
"""过滤有效数据"""
before = len(df)
# 这里可以根据业务规则过滤
df = df.dropna(subset=df.select_dtypes(include=[np.number]).columns[:1])
logging.info(f" 过滤: {before} -> {len(df)} 行")
return df
# 创建并运行管道
sample_data = pd.DataFrame({
"日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-02"],
"A": [100, 200, None, 200],
"B": [50, 100, 75, 100],
"C": ["x", "y", "z", "y"]
})
pipeline = AnalysisPipeline("销售数据处理管道")
(
pipeline
.add_step("去重", step_remove_duplicates)
.add_step("填充缺失值", step_fill_missing, strategy="mean")
.add_step("日期转换", step_convert_dates, columns=["日期"])
.add_step("特征工程", step_add_features)
.add_step("数据过滤", step_filter_valid)
)
result = pipeline.run(sample_data)
print(pipeline.get_report())
print("\n最终数据:")
print(result)
日志记录(Logging)
为什么用日志而不是print?
| 对比项 | logging | |
|---|---|---|
| 日志级别 | 无区分 | DEBUG/INFO/WARNING/ERROR/CRITICAL |
| 输出控制 | 难以控制 | 可以通过级别过滤 |
| 格式定制 | 手动 | 自动格式化(时间、级别、模块) |
| 文件输出 | 复杂 | 一行配置即可 |
| 生产可用 | 不推荐 | 强烈推荐 |
配置日志系统
import logging
import os
def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger:
"""
配置一个带文件输出的日志器
参数:
name: 日志器名称
log_file: 日志文件路径(可选)
level: 日志级别
"""
logger = logging.getLogger(name)
logger.setLevel(level)
# 避免重复添加handler
if logger.handlers:
return logger
# 格式化器
formatter = logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件输出(如果指定了文件)
if log_file:
os.makedirs(os.path.dirname(log_file) if os.path.dirname(log_file) else ".", exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info(f"日志文件: {log_file}")
return logger
# 使用示例
logger = setup_logger("数据分析", log_file="analysis.log")
logger.debug("这是一条调试信息(通常不显示)")
logger.info("数据处理开始")
logger.warning("发现3个缺失值,已填充")
logger.error("文件读取失败,使用默认值")
异常处理最佳实践
数据处理中的常见异常
class DataProcessingError(Exception):
"""自定义异常类"""
def __init__(self, message: str, detail: str = ""):
super().__init__(message)
self.detail = detail
self.message = message
def __str__(self):
if self.detail:
return f"{self.message} - {self.detail}"
return self.message
def safe_divide(a, b, default=0):
"""安全的除法运算"""
try:
return a / b
except ZeroDivisionError:
return default
except TypeError:
return default
def load_data_safely(filepath: str) -> pd.DataFrame:
"""安全地加载数据文件"""
if not os.path.exists(filepath):
raise DataProcessingError("文件不存在", f"路径: {filepath}")
if not filepath.endswith((".csv", ".xlsx", ".parquet")):
raise DataProcessingError("不支持的文件格式", f"文件: {filepath}")
try:
if filepath.endswith(".csv"):
df = pd.read_csv(filepath)
elif filepath.endswith(".xlsx"):
df = pd.read_excel(filepath)
elif filepath.endswith(".parquet"):
df = pd.read_parquet(filepath)
return df
except pd.errors.EmptyDataError:
raise DataProcessingError("文件为空", f"文件: {filepath}")
except Exception as e:
raise DataProcessingError("数据加载失败", str(e))
完整的异常处理示例
def run_analysis_with_safety(filepath: str):
"""带完整异常处理的分析流程"""
logger = setup_logger("安全分析")
try:
# 1. 加载数据
logger.info("正在加载数据...")
df = load_data_safely(filepath)
logger.info(f"成功加载 {len(df)} 行数据")
# 2. 数据验证
required_cols = ["日期", "销售额"]
missing = [col for col in required_cols if col not in df.columns]
if missing:
raise DataProcessingError("缺少必要列", f"缺失: {missing}")
# 3. 数据清洗
logger.info("正在清洗数据...")
df["销售额"] = pd.to_numeric(df["销售额"], errors="coerce")
n_missing = df["销售额"].isnull().sum()
if n_missing > len(df) * 0.5:
logger.warning(f"超过50%的销售额缺失({n_missing}行),分析结果可能不可靠")
df = df.dropna(subset=["销售额"])
df = df[df["销售额"] > 0]
logger.info(f"清洗后剩余 {len(df)} 行有效数据")
# 4. 分析计算
logger.info("正在计算统计指标...")
summary = {
"总销售额": df["销售额"].sum(),
"平均销售额": df["销售额"].mean(),
"最大销售额": df["销售额"].max(),
"记录数": len(df)
}
logger.info("分析完成!")
for key, value in summary.items():
logger.info(f" {key}: {value}")
return summary
except DataProcessingError as e:
logger.error(f"数据处理错误: {e}")
return None
except Exception as e:
logger.error(f"未知错误: {e}")
return None
finally:
logger.info("分析流程结束") # finally总是执行
配置文件管理
为什么需要配置文件?
把"会变的东西"和"不会变的东西"分开。
# 不好的做法:硬编码 db_host = "192.168.1.100" # 换环境就要改代码 db_port = 5432 output_dir = "/data/reports" threshold = 0.85 # 好的做法:配置文件 # config.yaml # database: # host: "192.168.1.100" # port: 5432 # paths: # output: "/data/reports" # analysis: # threshold: 0.85
使用Python配置文件
import json
import os
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_path: str = "config.json"):
self.config_path = config_path
self.config = {}
self.load()
def load(self):
"""加载配置文件"""
if os.path.exists(self.config_path):
with open(self.config_path, "r", encoding="utf-8") as f:
self.config = json.load(f)
logging.info(f"已加载配置: {self.config_path}")
else:
logging.warning(f"配置文件不存在: {self.config_path},使用默认配置")
self.config = self._get_defaults()
self.save()
def _get_defaults(self):
"""默认配置"""
return {
"data": {
"source_dir": "./data",
"output_dir": "./output",
"file_format": "csv"
},
"analysis": {
"outlier_threshold": 3.0,
"missing_fill_strategy": "mean",
"date_columns": ["日期", "时间", "timestamp"]
},
"report": {
"title": "数据分析报告",
"author": "数据分析系统",
"format": "html"
},
"logging": {
"level": "INFO",
"file": "analysis.log"
}
}
def get(self, key_path: str, default=None):
"""
获取配置值(支持嵌套路径)
例: config.get("data.source_dir")
"""
keys = key_path.split(".")
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def set(self, key_path: str, value):
"""设置配置值"""
keys = key_path.split(".")
config = self.config
for key in keys[:-1]:
if key not in config:
config[key] = {}
config = config[key]
config[keys[-1]] = value
def save(self):
"""保存配置到文件"""
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
logging.info(f"配置已保存: {self.config_path}")
# 使用示例
# config = ConfigManager()
# print(config.get("data.source_dir")) # 获取值
# config.set("analysis.outlier_threshold", 2.5) # 设置值
# config.save() # 保存
七、综合实战:完整的数据处理项目
# ========== 综合实战:模块化数据处理系统 ==========
import os
import json
from datetime import datetime
class DataAnalysisProject:
"""
模块化的数据分析项目
整合了清洗器、管道、日志、配置等功能
"""
def __init__(self, project_name: str, config_path: str = None):
self.project_name = project_name
self.start_time = datetime.now()
self.logger = setup_logger(project_name, log_file=f"{project_name}.log")
# 加载配置
self.config = ConfigManager(config_path) if config_path else ConfigManager()
self.logger.info(f"项目 '{project_name}' 已启动")
def run(self, data: pd.DataFrame) -> Dict:
"""运行完整分析流程"""
self.logger.info("=" * 50)
self.logger.info("开始执行分析流程")
self.logger.info("=" * 50)
try:
# Step 1: 数据概览
self.logger.info("Step 1: 数据概览")
overview = self._data_overview(data)
# Step 2: 数据清洗
self.logger.info("Step 2: 数据清洗")
clean_data = self._clean_data(data)
# Step 3: 数据分析
self.logger.info("Step 3: 数据分析")
analysis_results = self._analyze_data(clean_data)
# Step 4: 生成报告
self.logger.info("Step 4: 生成报告")
report = self._generate_report(overview, analysis_results)
# Step 5: 输出结果
self.logger.info("Step 5: 输出结果")
output_dir = self.config.get("data.output_dir", "./output")
os.makedirs(output_dir, exist_ok=True)
clean_data.to_csv(f"{output_dir}/cleaned_data.csv", index=False)
with open(f"{output_dir}/analysis_report.json", "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
elapsed = (datetime.now() - self.start_time).total_seconds()
self.logger.info(f"项目完成! 耗时: {elapsed:.1f}秒")
return report
except Exception as e:
self.logger.error(f"分析流程失败: {e}")
raise
def _data_overview(self, df: pd.DataFrame) -> Dict:
"""数据概览"""
return {
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"missing_values": int(df.isnull().sum().sum()),
"duplicate_rows": int(df.duplicated().sum())
}
def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗"""
cleaner = DataCleaner(name=f"{self.project_name}_清洗")
cleaner.data = df.copy()
# 去重
cleaner.remove_duplicates()
# 填充缺失值
strategy = self.config.get("analysis.missing_fill_strategy", "mean")
cleaner.fill_missing(strategy=strategy)
# 转换日期列
date_cols = self.config.get("analysis.date_columns", [])
type_map = {col: "datetime" for col in date_cols if col in df.columns}
if type_map:
cleaner.convert_types(type_map)
return cleaner.data
def _analyze_data(self, df: pd.DataFrame) -> Dict:
"""数据分析"""
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
results = {}
for col in num_cols:
results[col] = {
"count": int(df[col].count()),
"mean": round(float(df[col].mean()), 2),
"std": round(float(df[col].std()), 2),
"min": round(float(df[col].min()), 2),
"max": round(float(df[col].max()), 2),
"median": round(float(df[col].median()), 2)
}
return results
def _generate_report(self, overview: Dict, analysis: Dict) -> Dict:
"""生成报告"""
report = {
"project": self.project_name,
"timestamp": datetime.now().isoformat(),
"data_overview": overview,
"statistics": analysis,
"title": self.config.get("report.title", "分析报告"),
"author": self.config.get("report.author", "数据分析系统")
}
return report
# ========== 使用示例 ==========
# 创建模拟数据
sample_df = pd.DataFrame({
"日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05",
"2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"],
"销售额": [100, 200, 150, None, 180, 220, 190, 210, 170, 230],
"利润": [30, 60, 45, 50, 55, 70, 58, 65, 52, 75],
"客户数": [10, 25, 18, 20, 22, 30, 24, 28, 21, 32],
"产品": ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
})
# 运行项目
project = DataAnalysisProject("销售分析项目")
# report = project.run(sample_df)
实战练习
练习1:封装你的数据清洗类
题目: 在 DataCleaner 的基础上,添加以下功能:
- 添加
normalize_columns方法:对指定列做最大-最小归一化 - 添加
outlier_removal方法:使用IQR方法移除异常值 - 添加
export_report方法:导出清洗报告为JSON
# 参考答案
class EnhancedCleaner(DataCleaner):
"""增强版数据清洗器"""
def normalize_columns(self, columns: Optional[List[str]] = None) -> "EnhancedCleaner":
"""最大-最小归一化"""
cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
for col in cols:
if col in self.data.columns:
min_val = self.data[col].min()
max_val = self.data[col].max()
if max_val > min_val:
self.data[col] = (self.data[col] - min_val) / (max_val - min_val)
self.log_action("归一化", f"归一化了 {len(cols)} 列")
return self
def outlier_removal(self, columns: Optional[List[str]] = None) -> "EnhancedCleaner":
"""使用IQR方法移除异常值"""
cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
before = len(self.data)
for col in cols:
if col in self.data.columns:
q1 = self.data[col].quantile(0.25)
q3 = self.data[col].quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
self.data = self.data[(self.data[col] >= lower) & (self.data[col] <= upper)]
removed = before - len(self.data)
self.log_action("异常值移除", f"使用IQR方法移除了 {removed} 行")
return self
def export_report(self, filepath: str) -> str:
"""导出清洗报告为JSON"""
report = {
"name": self.name,
"final_rows": len(self.data),
"columns": list(self.data.columns),
"dtypes": {col: str(dtype) for col, dtype in self.data.dtypes.items()},
"missing_values": {col: int(self.data[col].isnull().sum()) for col in self.data.columns},
"history": self.history
}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
self.log_action("导出报告", f"已导出至 {filepath}")
return filepath
本节总结
本节我们从"写脚本"进阶到了"写系统":
- 面向对象编程:学会了用类封装数据和处理逻辑,让代码更有组织
- 数据清洗器:构建了一个通用的 DataCleaner 类,支持链式调用
- Pipeline设计:理解了管道模式,将复杂流程拆分为可复用的步骤
- 日志系统:用 logging 替代 print,获得更专业的程序运行记录
- 异常处理:学会了自定义异常、安全的错误处理和 finally 的使用
- 配置管理:把可变参数外置到配置文件,提高代码的灵活性
关键收获:
- 好代码的标准:可复用、可维护、可读性好
- 链式调用(返回self)让数据处理像搭积木一样优雅
- 日志和异常处理是生产级代码的标配,不是"可选项"
- 配置与代码分离,让你的系统能适应不同环境













