如何优化Python + MySQL应用以适应2GB内存的服务器?

在2GB内存的服务器上优化Python + MySQL应用需要从多个层面进行系统性优化。以下是详细的优化策略:

1. MySQL配置优化

my.cnf配置调整

[mysqld]
# 内存相关设置
innodb_buffer_pool_size = 512M
key_buffer_size = 64M
query_cache_size = 32M
tmp_table_size = 32M
max_heap_table_size = 32M

# 连接数优化
max_connections = 50
wait_timeout = 300
interactive_timeout = 300

# 日志和性能
slow_query_log = 1
long_query_time = 2
log_queries_not_using_indexes = 1

# InnoDB优化
innodb_log_file_size = 64M
innodb_flush_log_at_trx_commit = 2
innodb_file_per_table = 1

关键参数说明

  • innodb_buffer_pool_size: 设置为物理内存的25-30%
  • max_connections: 根据实际需求限制连接数
  • innodb_flush_log_at_trx_commit=2: 提高性能(牺牲部分数据安全性)

2. Python应用层优化

数据库连接池管理

import pymysql
from contextlib import contextmanager
from threading import Lock
import queue

class ConnectionPool:
    def __init__(self, host, user, password, database, max_connections=10):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.max_connections = max_connections
        self.pool = queue.Queue(maxsize=max_connections)
        self.lock = Lock()

        # 预创建连接
        for _ in range(max_connections):
            conn = self._create_connection()
            self.pool.put(conn)

    def _create_connection(self):
        return pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            charset='utf8mb4',
            autocommit=True,
            connect_timeout=10,
            read_timeout=10,
            write_timeout=10
        )

    @contextmanager
    def get_connection(self):
        conn = None
        try:
            conn = self.pool.get(timeout=5)
            yield conn
        except Exception as e:
            if conn:
                conn.close()
                conn = self._create_connection()
                self.pool.put(conn)
            raise e
        finally:
            if conn and conn.open:
                self.pool.put(conn)

# 使用示例
pool = ConnectionPool('localhost', 'user', 'password', 'database')

def query_data():
    with pool.get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM users LIMIT 100")
            return cursor.fetchall()

内存友好的数据处理

def process_large_dataset():
    """分批处理大数据集"""
    batch_size = 1000
    offset = 0

    while True:
        # 使用LIMIT和OFFSET分页查询
        query = "SELECT id, name, email FROM users LIMIT %s OFFSET %s"

        with pool.get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(query, (batch_size, offset))
                results = cursor.fetchall()

                if not results:
                    break

                # 处理当前批次数据
                for row in results:
                    process_row(row)

                offset += batch_size

        # 主动释放内存
        import gc
        gc.collect()

def process_with_generator():
    """使用生成器逐行处理"""
    def data_generator():
        with pool.get_connection() as conn:
            with conn.cursor(pymysql.cursors.SSCursor) as cursor:  # 服务端游标
                cursor.execute("SELECT * FROM large_table")
                while True:
                    row = cursor.fetchone()
                    if row is None:
                        break
                    yield row

    for row in data_generator():
        process_single_row(row)

3. 查询优化

索引优化

-- 创建合适的索引
CREATE INDEX idx_user_status ON users(status);
CREATE INDEX idx_user_created ON users(created_at);
CREATE INDEX idx_user_email ON users(email);

-- 复合索引
CREATE INDEX idx_user_status_created ON users(status, created_at);

优化的查询语句

def optimized_queries():
    # 1. 只选择需要的字段
    cursor.execute("SELECT id, name, email FROM users WHERE status = %s", ['active'])

    # 2. 使用EXISTS代替IN
    cursor.execute("""
        SELECT u.id, u.name 
        FROM users u 
        WHERE EXISTS (
            SELECT 1 FROM orders o 
            WHERE o.user_id = u.id AND o.status = 'completed'
        )
    """)

    # 3. 避免SELECT *
    cursor.execute("SELECT COUNT(*) FROM users WHERE age > %s", [18])

    # 4. 使用预编译语句
    stmt = "INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)"
    cursor.executemany(stmt, batch_data)

4. 缓存策略

Redis缓存实现

import redis
import json
from functools import wraps

class CacheManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)

    def cache_result(self, ttl=300):  # 默认5分钟
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存键
                cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"

                # 尝试从缓存获取
                cached_result = self.redis_client.get(cache_key)
                if cached_result:
                    return json.loads(cached_result)

                # 执行函数并缓存结果
                result = func(*args, **kwargs)
                self.redis_client.setex(cache_key, ttl, json.dumps(result))
                return result
            return wrapper
        return decorator

# 使用示例
cache_manager = CacheManager()

@cache_manager.cache_result(ttl=600)
def get_user_profile(user_id):
    with pool.get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM users WHERE id = %s", [user_id])
            return cursor.fetchone()

5. 内存监控和垃圾回收

内存监控工具

import psutil
import os
import logging
from functools import wraps

class MemoryMonitor:
    def __init__(self, threshold_mb=1500):
        self.threshold_mb = threshold_mb
        self.process = psutil.Process(os.getpid())

    def check_memory_usage(self):
        memory_mb = self.process.memory_info().rss / 1024 / 1024
        if memory_mb > self.threshold_mb:
            logging.warning(f"Memory usage high: {memory_mb:.2f}MB")
            return True
        return False

    def memory_guard(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if self.check_memory_usage():
                import gc
                gc.collect()
                # 可以考虑降级处理或返回错误
            return func(*args, **kwargs)
        return wrapper

# 应用内存监控
monitor = MemoryMonitor()

@monitor.memory_guard
def heavy_operation():
    # 你的业务逻辑
    pass

6. 异步处理优化

使用asyncio和aiomysql

import asyncio
import aiomysql

class AsyncDatabase:
    def __init__(self, host, user, password, database):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.pool = None

    async def init_pool(self):
        self.pool = await aiomysql.create_pool(
            host=self.host,
            port=3306,
            user=self.user,
            password=self.password,
            db=self.database,
            minsize=1,
            maxsize=5,
            autocommit=True
        )

    async def execute_query(self, query, params=None):
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, params)
                return await cur.fetchall()

    async def close(self):
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

# 使用示例
async def main():
    db = AsyncDatabase('localhost', 'user', 'password', 'database')
    await db.init_pool()

    try:
        results = await db.execute_query("SELECT * FROM users LIMIT 100")
        print(results)
    finally:
        await db.close()

# 运行异步代码
# asyncio.run(main())

7. 系统级优化建议

Linux系统优化

# 调整内核参数
echo 'vm.swappiness=10' >> /etc/sysctl.conf
echo 'vm.vfs_cache_pressure=50' >> /etc/sysctl.conf

# 优化文件描述符限制
ulimit -n 65536

# 清理不必要的服务
systemctl disable bluetooth
systemctl disable cups

定期维护脚本

import subprocess
import logging
from datetime import datetime

def cleanup_system():
    """定期清理系统"""
    try:
        # 清理MySQL二进制日志
        subprocess.run([
            "mysql", "-u", "root", "-p", "password",
            "-e", "PURGE BINARY LOGS BEFORE DATE_SUB(NOW(), INTERVAL 7 DAY);"
        ])

        # 优化表
        subprocess.run([
            "mysqlcheck", "-u", "root", "-p", "password",
            "--optimize", "your_database"
        ])

        logging.info(f"Cleanup completed at {datetime.now()}")
    except Exception as e:
        logging.error(f"Cleanup failed: {e}")

8. 监控和告警

基础监控脚本

import time
import smtplib
from email.mime.text import MIMEText

def monitor_resources():
    """监控资源使用情况"""
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent

    if memory_percent > 85 or cpu_percent > 80:
        send_alert(f"High resource usage: CPU {cpu_percent}%, Memory {memory_percent}%")

def send_alert(message):
    # 发送告警邮件
    msg = MIMEText(message)
    msg['Subject'] = 'Server Alert'
    msg['From'] = 'alert@yourdomain.com'
    msg['To'] = 'admin@yourdomain.com'

    try:
        server = smtplib.SMTP('localhost')
        server.send_message(msg)
        server.quit()
    except:
        pass  # 忽略发送失败

通过以上综合优化策略,可以在2GB内存的服务器上有效运行Python + MySQL应用。关键是要:

  1. 合理配置MySQL内存参数
  2. 使用连接池管理数据库连接
  3. 采用分批处理大数据集
  4. 实施适当的缓存策略
  5. 监控内存使用并及时清理
  6. 优化查询语句和索引
未经允许不得转载:CLOUD云枢 » 如何优化Python + MySQL应用以适应2GB内存的服务器?