本节学习目标

通过本节学习,你将能够:

  1. 理解面向对象编程(OOP)在数据分析中的应用价值
  2. 学会使用类(Class)封装数据处理逻辑
  3. 掌握数据处理管道(Pipeline)的构建方法
  4. 学会使用日志(Logging)记录程序运行状态
  5. 掌握异常处理(Try-Except)的最佳实践
  6. 理解配置文件管理,让代码更加灵活可维护

为什么学这个?

回顾一下你之前写的代码。是不是经常遇到这样的情况:

# 项目1:分析销售数据
df = pd.read_csv("sales.csv")
df = df.dropna()
df["日期"] = pd.to_datetime(df["日期"])
df = df[df["金额"] > 0]
# ... 分析代码 ...

# 项目2:分析用户数据
df = pd.read_csv("users.csv")
df = df.dropna()
df["注册时间"] = pd.to_datetime(df["注册时间"])
df = df[df["年龄"] > 0]
# ... 又是类似的分析代码 ...

# 项目3:分析产品数据
df = pd.read_csv("products.csv")
df = df.dropna()
df["上架日期"] = pd.to_datetime(df["上架日期"])
df = df[df["价格"] > 0]
# ... 又来一遍 ...

同样的清洗逻辑,写了一遍又一遍。每次改一个地方,其他所有项目都要改。这就像你有5个手机,每个都要单独充电,而不是用一根线充所有手机。

好的代码应该是可复用的。 本节要教你的,就是把"每次写一遍"变成"写一次,到处用"。

打个比方:

  • 之前的你:每次做饭都从种菜开始
  • 学完这节后:你有了一个"厨房",食材拿过来就能直接做

核心知识点讲解

面向对象编程(OOP)基础

为什么要在数据分析中用类?

类(Class)就是把相关的数据操作打包在一起的容器。

# 不用类的方式:数据和操作分开
df = pd.read_csv("data.csv")
# 一堆处理代码...
# 再一堆处理代码...
# 变量到处飞,很难追踪

# 用类的方式:数据和操作在一起
processor = DataProcessor("data.csv")
processor.load()
processor.clean()
processor.analyze()
# 逻辑清晰,一目了然

类的基本概念

import pandas as pd
import numpy as np

class SimpleAnalyzer:
    """一个简单的数据分析器类"""

    # __init__ 是初始化方法,创建对象时自动调用
    def __init__(self, name="默认分析器"):
        self.name = name           # 属性:对象的数据
        self.data = None           # 属性:存放数据
        self.report = None         # 属性:存放分析结果
        print(f"分析器 '{self.name}' 已创建")

    # 方法:对象能做什么
    def load_data(self, filepath):
        """加载数据"""
        self.data = pd.read_csv(filepath)
        print(f"已加载 {len(self.data)} 行数据")
        return self.data

    def summary(self):
        """生成数据摘要"""
        if self.data is None:
            return "请先加载数据"
        result = {
            "行数": len(self.data),
            "列数": len(self.data.columns),
            "缺失值": int(self.data.isnull().sum().sum()),
            "重复行": int(self.data.duplicated().sum())
        }
        self.report = result
        return result

    def __str__(self):
        """定义打印对象时的显示"""
        return f"分析器: {self.name}, 数据: {len(self.data) if self.data is not None else '未加载'}行"


# 使用示例
# analyzer = SimpleAnalyzer("销售分析器")
# analyzer.load_data("sales.csv")
# print(analyzer.summary())
# print(analyzer)

构建数据处理类

通用的数据清洗器

import logging
from typing import List, Dict, Optional, Union

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S"
)

class DataCleaner:
    """
    通用数据清洗器
    功能: 加载、清洗、转换、保存数据
    """

    def __init__(self, data: Optional[pd.DataFrame] = None, name: str = "Cleaner"):
        self.name = name
        self.data = data
        self.history = []  # 记录每一步操作
        self._log = logging.getLogger(self.name)

    def log_action(self, action: str, details: str = ""):
        """记录操作到历史日志"""
        entry = {"action": action, "details": details, "rows": len(self.data) if self.data is not None else 0}
        self.history.append(entry)
        self._log.info(f"[{action}] {details}")

    def load_csv(self, filepath: str, **kwargs) -> "DataCleaner":
        """从CSV加载数据"""
        try:
            self.data = pd.read_csv(filepath, **kwargs)
            self.log_action("加载", f"从 {filepath} 加载了 {len(self.data)} 行数据")
        except FileNotFoundError:
            self._log.error(f"文件不存在: {filepath}")
            raise
        except Exception as e:
            self._log.error(f"加载失败: {e}")
            raise
        return self  # 返回self支持链式调用

    def load_dict(self, data_dict: Dict) -> "DataCleaner":
        """从字典创建数据"""
        self.data = pd.DataFrame(data_dict)
        self.log_action("创建", f"从字典创建了 {len(self.data)} 行数据")
        return self

    def remove_duplicates(self, subset: Optional[List[str]] = None) -> "DataCleaner":
        """删除重复行"""
        before = len(self.data)
        self.data = self.data.drop_duplicates(subset=subset)
        removed = before - len(self.data)
        self.log_action("去重", f"删除了 {removed} 行重复数据")
        return self

    def fill_missing(self, strategy: str = "mean", columns: Optional[List[str]] = None) -> "DataCleaner":
        """
        填充缺失值

        参数:
            strategy: 填充策略 ("mean", "median", "mode", "zero", "forward")
            columns: 指定列,不指定则对所有数值列操作
        """
        cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
        filled_count = 0

        for col in cols:
            if col not in self.data.columns:
                continue
            missing_before = self.data[col].isnull().sum()

            if strategy == "mean":
                self.data[col] = self.data[col].fillna(self.data[col].mean())
            elif strategy == "median":
                self.data[col] = self.data[col].fillna(self.data[col].median())
            elif strategy == "mode":
                self.data[col] = self.data[col].fillna(self.data[col].mode()[0])
            elif strategy == "zero":
                self.data[col] = self.data[col].fillna(0)
            elif strategy == "forward":
                self.data[col] = self.data[col].ffill()

            filled_count += missing_before

        self.log_action("填充缺失值", f"策略={strategy}, 填充了 {filled_count} 个缺失值")
        return self

    def convert_types(self, type_map: Dict[str, str]) -> "DataCleaner":
        """
        转换列数据类型

        参数:
            type_map: {"列名": "目标类型"},如 {"日期": "datetime", "金额": "float"}
        """
        for col, target_type in type_map.items():
            if col not in self.data.columns:
                self._log.warning(f"列不存在: {col}")
                continue
            try:
                if target_type == "datetime":
                    self.data[col] = pd.to_datetime(self.data[col])
                elif target_type == "int":
                    self.data[col] = pd.to_numeric(self.data[col]).astype("Int64")
                elif target_type == "float":
                    self.data[col] = pd.to_numeric(self.data[col]).astype(float)
                elif target_type == "str":
                    self.data[col] = self.data[col].astype(str)
                elif target_type == "category":
                    self.data[col] = self.data[col].astype("category")
                self.log_action("类型转换", f"{col} -> {target_type}")
            except Exception as e:
                self._log.error(f"类型转换失败 {col}: {e}")
        return self

    def filter_rows(self, condition) -> "DataCleaner":
        """
        按条件过滤行

        参数:
            condition: 布尔条件,如 df["金额"] > 0
        """
        before = len(self.data)
        self.data = self.data[condition]
        removed = before - len(self.data)
        self.log_action("过滤", f"删除了 {removed} 行,剩余 {len(self.data)} 行")
        return self

    def rename_columns(self, name_map: Dict[str, str]) -> "DataCleaner":
        """重命名列"""
        self.data = self.data.rename(columns=name_map)
        self.log_action("重命名", f"重命名了 {len(name_map)} 列")
        return self

    def get_summary(self) -> pd.DataFrame:
        """获取数据摘要统计"""
        if self.data is None:
            return pd.DataFrame()
        summary = pd.DataFrame({
            "类型": self.data.dtypes,
            "非空数": self.data.count(),
            "缺失数": self.data.isnull().sum(),
            "缺失率": self.data.isnull().sum() / len(self.data),
            "唯一值数": self.data.nunique()
        })
        # 数值列额外统计
        num_cols = self.data.select_dtypes(include=[np.number]).columns
        if len(num_cols) > 0:
            stats = self.data[num_cols].describe().T
            stats = stats.rename(columns={
                "mean": "均值", "std": "标准差", "min": "最小值",
                "25%": "25分位", "50%": "中位数", "75%": "75分位", "max": "最大值"
            })
            summary = summary.join(stats)
        return summary

    def save_csv(self, filepath: str) -> "DataCleaner":
        """保存为CSV"""
        self.data.to_csv(filepath, index=False)
        self.log_action("保存", f"已保存至 {filepath}")
        return self

    def print_history(self):
        """打印操作历史"""
        print("\n=== 操作历史 ===")
        for i, entry in enumerate(self.history, 1):
            print(f"  {i}. [{entry['action']}] {entry['details']} (当前: {entry['rows']}行)")

使用数据清洗器

# 模拟数据
sample_data = {
    "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-03"],
    "产品": ["A", "B", None, "A", "A"],
    "销售额": [100, 200, 150, None, 100],
    "数量": [10, 20, 15, 8, 10],
    "单价": [10.0, 10.0, 10.0, 12.5, 10.0],
}

# 链式调用演示
cleaner = DataCleaner(name="销售数据清洗")
(
    cleaner
    .load_dict(sample_data)
    .remove_duplicates()
    .fill_missing(strategy="mean", columns=["销售额"])
    .fill_missing(strategy="mode", columns=["产品"])
    .convert_types({"日期": "datetime"})
    .filter_rows(lambda df: df["销售额"] > 0)
)

print("\n=== 清洗后的数据 ===")
print(cleaner.data)
print("\n=== 数据摘要 ===")
print(cleaner.get_summary())
cleaner.print_history()

Pipeline(管道)构建

什么是Pipeline?

Pipeline就像一个工厂流水线:

原料 -> 清洗 -> 切割 -> 组装 -> 包装 -> 成品
  1       2       3       4       5       6

每一步只做好一件事,然后把结果传给下一步。这样的好处是:

  • 清晰:每一步做什么一目了然
  • 可复用:可以替换其中的某一步
  • 可维护:出问题能快速定位到哪一步

实现分析Pipeline

class AnalysisPipeline:
    """
    数据分析管道
    将数据加载、清洗、分析、报告串联起来
    """

    def __init__(self, name="分析管道"):
        self.name = name
        self.steps = []         # 管道步骤列表
        self.data = None        # 当前数据
        self.results = {}       # 各步骤的结果
        self._log = logging.getLogger(name)

    def add_step(self, name: str, func, **kwargs):
        """添加一个处理步骤"""
        self.steps.append({"name": name, "func": func, "kwargs": kwargs})
        self._log.info(f"添加步骤: {name}")
        return self

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        """运行整个管道"""
        self.data = data.copy()
        self._log.info(f"开始运行管道 '{self.name}', 输入数据: {len(self.data)} 行")

        for step in self.steps:
            try:
                self._log.info(f"执行步骤: {step['name']}")
                self.data = step["func"](self.data, **step["kwargs"])
                self.results[step["name"]] = {
                    "rows": len(self.data),
                    "columns": len(self.data.columns)
                }
                self._log.info(f"  -> 完成: {len(self.data)} 行, {len(self.data.columns)} 列")
            except Exception as e:
                self._log.error(f"步骤 '{step['name']}' 执行失败: {e}")
                raise

        self._log.info(f"管道运行完成!")
        return self.data

    def get_report(self) -> str:
        """生成管道运行报告"""
        report = [f"\n=== 管道运行报告: {self.name} ==="]
        for name, info in self.results.items():
            report.append(f"  {name}: {info['rows']} 行, {info['columns']} 列")
        return "\n".join(report)


# ========== 使用示例 ==========

# 定义各个步骤的处理函数
def step_remove_duplicates(df, **kwargs):
    before = len(df)
    df = df.drop_duplicates()
    logging.info(f"  去重: {before} -> {len(df)} 行")
    return df

def step_fill_missing(df, strategy="mean", columns=None, **kwargs):
    cols = columns or df.select_dtypes(include=[np.number]).columns.tolist()
    for col in cols:
        if col in df.columns:
            if strategy == "mean":
                df[col] = df[col].fillna(df[col].mean())
            elif strategy == "median":
                df[col] = df[col].fillna(df[col].median())
    logging.info(f"  填充缺失值: 策略={strategy}")
    return df

def step_convert_dates(df, columns=None, **kwargs):
    cols = columns or []
    for col in cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col])
    logging.info(f"  日期转换: {len(cols)} 列")
    return df

def step_add_features(df, **kwargs):
    """添加衍生特征"""
    num_cols = df.select_dtypes(include=[np.number]).columns
    for col in num_cols:
        df[f"{col}_标准化"] = (df[col] - df[col].mean()) / df[col].std()
    logging.info(f"  添加特征: {len(num_cols)} 个标准化列")
    return df

def step_filter_valid(df, **kwargs):
    """过滤有效数据"""
    before = len(df)
    # 这里可以根据业务规则过滤
    df = df.dropna(subset=df.select_dtypes(include=[np.number]).columns[:1])
    logging.info(f"  过滤: {before} -> {len(df)} 行")
    return df

# 创建并运行管道
sample_data = pd.DataFrame({
    "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-02"],
    "A": [100, 200, None, 200],
    "B": [50, 100, 75, 100],
    "C": ["x", "y", "z", "y"]
})

pipeline = AnalysisPipeline("销售数据处理管道")
(
    pipeline
    .add_step("去重", step_remove_duplicates)
    .add_step("填充缺失值", step_fill_missing, strategy="mean")
    .add_step("日期转换", step_convert_dates, columns=["日期"])
    .add_step("特征工程", step_add_features)
    .add_step("数据过滤", step_filter_valid)
)

result = pipeline.run(sample_data)
print(pipeline.get_report())
print("\n最终数据:")
print(result)

日志记录(Logging)

为什么用日志而不是print?

对比项 print logging
日志级别 无区分 DEBUG/INFO/WARNING/ERROR/CRITICAL
输出控制 难以控制 可以通过级别过滤
格式定制 手动 自动格式化(时间、级别、模块)
文件输出 复杂 一行配置即可
生产可用 不推荐 强烈推荐

配置日志系统

import logging
import os

def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger:
    """
    配置一个带文件输出的日志器

    参数:
        name: 日志器名称
        log_file: 日志文件路径(可选)
        level: 日志级别
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)

    # 避免重复添加handler
    if logger.handlers:
        return logger

    # 格式化器
    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    # 控制台输出
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # 文件输出(如果指定了文件)
    if log_file:
        os.makedirs(os.path.dirname(log_file) if os.path.dirname(log_file) else ".", exist_ok=True)
        file_handler = logging.FileHandler(log_file, encoding="utf-8")
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
        logger.info(f"日志文件: {log_file}")

    return logger

# 使用示例
logger = setup_logger("数据分析", log_file="analysis.log")

logger.debug("这是一条调试信息(通常不显示)")
logger.info("数据处理开始")
logger.warning("发现3个缺失值,已填充")
logger.error("文件读取失败,使用默认值")

异常处理最佳实践

数据处理中的常见异常

class DataProcessingError(Exception):
    """自定义异常类"""
    def __init__(self, message: str, detail: str = ""):
        super().__init__(message)
        self.detail = detail
        self.message = message

    def __str__(self):
        if self.detail:
            return f"{self.message} - {self.detail}"
        return self.message


def safe_divide(a, b, default=0):
    """安全的除法运算"""
    try:
        return a / b
    except ZeroDivisionError:
        return default
    except TypeError:
        return default


def load_data_safely(filepath: str) -> pd.DataFrame:
    """安全地加载数据文件"""
    if not os.path.exists(filepath):
        raise DataProcessingError("文件不存在", f"路径: {filepath}")

    if not filepath.endswith((".csv", ".xlsx", ".parquet")):
        raise DataProcessingError("不支持的文件格式", f"文件: {filepath}")

    try:
        if filepath.endswith(".csv"):
            df = pd.read_csv(filepath)
        elif filepath.endswith(".xlsx"):
            df = pd.read_excel(filepath)
        elif filepath.endswith(".parquet"):
            df = pd.read_parquet(filepath)
        return df
    except pd.errors.EmptyDataError:
        raise DataProcessingError("文件为空", f"文件: {filepath}")
    except Exception as e:
        raise DataProcessingError("数据加载失败", str(e))

完整的异常处理示例

def run_analysis_with_safety(filepath: str):
    """带完整异常处理的分析流程"""
    logger = setup_logger("安全分析")

    try:
        # 1. 加载数据
        logger.info("正在加载数据...")
        df = load_data_safely(filepath)
        logger.info(f"成功加载 {len(df)} 行数据")

        # 2. 数据验证
        required_cols = ["日期", "销售额"]
        missing = [col for col in required_cols if col not in df.columns]
        if missing:
            raise DataProcessingError("缺少必要列", f"缺失: {missing}")

        # 3. 数据清洗
        logger.info("正在清洗数据...")
        df["销售额"] = pd.to_numeric(df["销售额"], errors="coerce")
        n_missing = df["销售额"].isnull().sum()
        if n_missing > len(df) * 0.5:
            logger.warning(f"超过50%的销售额缺失({n_missing}行),分析结果可能不可靠")

        df = df.dropna(subset=["销售额"])
        df = df[df["销售额"] > 0]
        logger.info(f"清洗后剩余 {len(df)} 行有效数据")

        # 4. 分析计算
        logger.info("正在计算统计指标...")
        summary = {
            "总销售额": df["销售额"].sum(),
            "平均销售额": df["销售额"].mean(),
            "最大销售额": df["销售额"].max(),
            "记录数": len(df)
        }

        logger.info("分析完成!")
        for key, value in summary.items():
            logger.info(f"  {key}: {value}")

        return summary

    except DataProcessingError as e:
        logger.error(f"数据处理错误: {e}")
        return None
    except Exception as e:
        logger.error(f"未知错误: {e}")
        return None
    finally:
        logger.info("分析流程结束")  # finally总是执行

配置文件管理

为什么需要配置文件?

把"会变的东西"和"不会变的东西"分开。

# 不好的做法:硬编码
db_host = "192.168.1.100"    # 换环境就要改代码
db_port = 5432
output_dir = "/data/reports"
threshold = 0.85

# 好的做法:配置文件
# config.yaml
# database:
#   host: "192.168.1.100"
#   port: 5432
# paths:
#   output: "/data/reports"
# analysis:
#   threshold: 0.85

使用Python配置文件

import json
import os

class ConfigManager:
    """配置文件管理器"""

    def __init__(self, config_path: str = "config.json"):
        self.config_path = config_path
        self.config = {}
        self.load()

    def load(self):
        """加载配置文件"""
        if os.path.exists(self.config_path):
            with open(self.config_path, "r", encoding="utf-8") as f:
                self.config = json.load(f)
            logging.info(f"已加载配置: {self.config_path}")
        else:
            logging.warning(f"配置文件不存在: {self.config_path},使用默认配置")
            self.config = self._get_defaults()
            self.save()

    def _get_defaults(self):
        """默认配置"""
        return {
            "data": {
                "source_dir": "./data",
                "output_dir": "./output",
                "file_format": "csv"
            },
            "analysis": {
                "outlier_threshold": 3.0,
                "missing_fill_strategy": "mean",
                "date_columns": ["日期", "时间", "timestamp"]
            },
            "report": {
                "title": "数据分析报告",
                "author": "数据分析系统",
                "format": "html"
            },
            "logging": {
                "level": "INFO",
                "file": "analysis.log"
            }
        }

    def get(self, key_path: str, default=None):
        """
        获取配置值(支持嵌套路径)
        例: config.get("data.source_dir")
        """
        keys = key_path.split(".")
        value = self.config
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default
        return value

    def set(self, key_path: str, value):
        """设置配置值"""
        keys = key_path.split(".")
        config = self.config
        for key in keys[:-1]:
            if key not in config:
                config[key] = {}
            config = config[key]
        config[keys[-1]] = value

    def save(self):
        """保存配置到文件"""
        with open(self.config_path, "w", encoding="utf-8") as f:
            json.dump(self.config, f, indent=2, ensure_ascii=False)
        logging.info(f"配置已保存: {self.config_path}")


# 使用示例
# config = ConfigManager()
# print(config.get("data.source_dir"))           # 获取值
# config.set("analysis.outlier_threshold", 2.5)  # 设置值
# config.save()                                   # 保存

七、综合实战:完整的数据处理项目

# ========== 综合实战:模块化数据处理系统 ==========

import os
import json
from datetime import datetime

class DataAnalysisProject:
    """
    模块化的数据分析项目
    整合了清洗器、管道、日志、配置等功能
    """

    def __init__(self, project_name: str, config_path: str = None):
        self.project_name = project_name
        self.start_time = datetime.now()
        self.logger = setup_logger(project_name, log_file=f"{project_name}.log")

        # 加载配置
        self.config = ConfigManager(config_path) if config_path else ConfigManager()

        self.logger.info(f"项目 '{project_name}' 已启动")

    def run(self, data: pd.DataFrame) -> Dict:
        """运行完整分析流程"""
        self.logger.info("=" * 50)
        self.logger.info("开始执行分析流程")
        self.logger.info("=" * 50)

        try:
            # Step 1: 数据概览
            self.logger.info("Step 1: 数据概览")
            overview = self._data_overview(data)

            # Step 2: 数据清洗
            self.logger.info("Step 2: 数据清洗")
            clean_data = self._clean_data(data)

            # Step 3: 数据分析
            self.logger.info("Step 3: 数据分析")
            analysis_results = self._analyze_data(clean_data)

            # Step 4: 生成报告
            self.logger.info("Step 4: 生成报告")
            report = self._generate_report(overview, analysis_results)

            # Step 5: 输出结果
            self.logger.info("Step 5: 输出结果")
            output_dir = self.config.get("data.output_dir", "./output")
            os.makedirs(output_dir, exist_ok=True)
            clean_data.to_csv(f"{output_dir}/cleaned_data.csv", index=False)

            with open(f"{output_dir}/analysis_report.json", "w", encoding="utf-8") as f:
                json.dump(report, f, indent=2, ensure_ascii=False, default=str)

            elapsed = (datetime.now() - self.start_time).total_seconds()
            self.logger.info(f"项目完成! 耗时: {elapsed:.1f}秒")

            return report

        except Exception as e:
            self.logger.error(f"分析流程失败: {e}")
            raise

    def _data_overview(self, df: pd.DataFrame) -> Dict:
        """数据概览"""
        return {
            "rows": len(df),
            "columns": len(df.columns),
            "column_names": list(df.columns),
            "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
            "missing_values": int(df.isnull().sum().sum()),
            "duplicate_rows": int(df.duplicated().sum())
        }

    def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """数据清洗"""
        cleaner = DataCleaner(name=f"{self.project_name}_清洗")
        cleaner.data = df.copy()

        # 去重
        cleaner.remove_duplicates()

        # 填充缺失值
        strategy = self.config.get("analysis.missing_fill_strategy", "mean")
        cleaner.fill_missing(strategy=strategy)

        # 转换日期列
        date_cols = self.config.get("analysis.date_columns", [])
        type_map = {col: "datetime" for col in date_cols if col in df.columns}
        if type_map:
            cleaner.convert_types(type_map)

        return cleaner.data

    def _analyze_data(self, df: pd.DataFrame) -> Dict:
        """数据分析"""
        num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        results = {}

        for col in num_cols:
            results[col] = {
                "count": int(df[col].count()),
                "mean": round(float(df[col].mean()), 2),
                "std": round(float(df[col].std()), 2),
                "min": round(float(df[col].min()), 2),
                "max": round(float(df[col].max()), 2),
                "median": round(float(df[col].median()), 2)
            }

        return results

    def _generate_report(self, overview: Dict, analysis: Dict) -> Dict:
        """生成报告"""
        report = {
            "project": self.project_name,
            "timestamp": datetime.now().isoformat(),
            "data_overview": overview,
            "statistics": analysis,
            "title": self.config.get("report.title", "分析报告"),
            "author": self.config.get("report.author", "数据分析系统")
        }
        return report


# ========== 使用示例 ==========

# 创建模拟数据
sample_df = pd.DataFrame({
    "日期": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05",
             "2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"],
    "销售额": [100, 200, 150, None, 180, 220, 190, 210, 170, 230],
    "利润": [30, 60, 45, 50, 55, 70, 58, 65, 52, 75],
    "客户数": [10, 25, 18, 20, 22, 30, 24, 28, 21, 32],
    "产品": ["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
})

# 运行项目
project = DataAnalysisProject("销售分析项目")
# report = project.run(sample_df)

实战练习

练习1:封装你的数据清洗类

题目:DataCleaner 的基础上,添加以下功能:

  1. 添加 normalize_columns 方法:对指定列做最大-最小归一化
  2. 添加 outlier_removal 方法:使用IQR方法移除异常值
  3. 添加 export_report 方法:导出清洗报告为JSON
# 参考答案

class EnhancedCleaner(DataCleaner):
    """增强版数据清洗器"""

    def normalize_columns(self, columns: Optional[List[str]] = None) -> "EnhancedCleaner":
        """最大-最小归一化"""
        cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
        for col in cols:
            if col in self.data.columns:
                min_val = self.data[col].min()
                max_val = self.data[col].max()
                if max_val > min_val:
                    self.data[col] = (self.data[col] - min_val) / (max_val - min_val)
        self.log_action("归一化", f"归一化了 {len(cols)} 列")
        return self

    def outlier_removal(self, columns: Optional[List[str]] = None) -> "EnhancedCleaner":
        """使用IQR方法移除异常值"""
        cols = columns or self.data.select_dtypes(include=[np.number]).columns.tolist()
        before = len(self.data)

        for col in cols:
            if col in self.data.columns:
                q1 = self.data[col].quantile(0.25)
                q3 = self.data[col].quantile(0.75)
                iqr = q3 - q1
                lower = q1 - 1.5 * iqr
                upper = q3 + 1.5 * iqr
                self.data = self.data[(self.data[col] >= lower) & (self.data[col] <= upper)]

        removed = before - len(self.data)
        self.log_action("异常值移除", f"使用IQR方法移除了 {removed} 行")
        return self

    def export_report(self, filepath: str) -> str:
        """导出清洗报告为JSON"""
        report = {
            "name": self.name,
            "final_rows": len(self.data),
            "columns": list(self.data.columns),
            "dtypes": {col: str(dtype) for col, dtype in self.data.dtypes.items()},
            "missing_values": {col: int(self.data[col].isnull().sum()) for col in self.data.columns},
            "history": self.history
        }
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(report, f, indent=2, ensure_ascii=False, default=str)
        self.log_action("导出报告", f"已导出至 {filepath}")
        return filepath

本节总结

本节我们从"写脚本"进阶到了"写系统":

  1. 面向对象编程:学会了用类封装数据和处理逻辑,让代码更有组织
  2. 数据清洗器:构建了一个通用的 DataCleaner 类,支持链式调用
  3. Pipeline设计:理解了管道模式,将复杂流程拆分为可复用的步骤
  4. 日志系统:用 logging 替代 print,获得更专业的程序运行记录
  5. 异常处理:学会了自定义异常、安全的错误处理和 finally 的使用
  6. 配置管理:把可变参数外置到配置文件,提高代码的灵活性

关键收获:

  • 好代码的标准:可复用、可维护、可读性好
  • 链式调用(返回self)让数据处理像搭积木一样优雅
  • 日志和异常处理是生产级代码的标配,不是"可选项"
  • 配置与代码分离,让你的系统能适应不同环境

觉得上面的内容有用吗?快来点个赞吧!

点赞() 我要打赏

温馨提示 : 本站内容来自会员投稿以及互联网,所有源码及教程均为作者总结编辑,请大家在使用过程中提前做好备份,以免发生无法预知的错误,源码类教程请勿直接用于生产环境!

 可能感兴趣的文章

1 2 3 4 5