在2GB内存的服务器上优化Python + MySQL应用需要从多个层面进行系统性优化。以下是详细的优化策略:
1. MySQL配置优化
my.cnf配置调整
[mysqld]
# 内存相关设置
innodb_buffer_pool_size = 512M
key_buffer_size = 64M
query_cache_size = 32M
tmp_table_size = 32M
max_heap_table_size = 32M
# 连接数优化
max_connections = 50
wait_timeout = 300
interactive_timeout = 300
# 日志和性能
slow_query_log = 1
long_query_time = 2
log_queries_not_using_indexes = 1
# InnoDB优化
innodb_log_file_size = 64M
innodb_flush_log_at_trx_commit = 2
innodb_file_per_table = 1
关键参数说明
innodb_buffer_pool_size: 设置为物理内存的25-30%max_connections: 根据实际需求限制连接数innodb_flush_log_at_trx_commit=2: 提高性能(牺牲部分数据安全性)
2. Python应用层优化
数据库连接池管理
import pymysql
from contextlib import contextmanager
from threading import Lock
import queue
class ConnectionPool:
def __init__(self, host, user, password, database, max_connections=10):
self.host = host
self.user = user
self.password = password
self.database = database
self.max_connections = max_connections
self.pool = queue.Queue(maxsize=max_connections)
self.lock = Lock()
# 预创建连接
for _ in range(max_connections):
conn = self._create_connection()
self.pool.put(conn)
def _create_connection(self):
return pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True,
connect_timeout=10,
read_timeout=10,
write_timeout=10
)
@contextmanager
def get_connection(self):
conn = None
try:
conn = self.pool.get(timeout=5)
yield conn
except Exception as e:
if conn:
conn.close()
conn = self._create_connection()
self.pool.put(conn)
raise e
finally:
if conn and conn.open:
self.pool.put(conn)
# 使用示例
pool = ConnectionPool('localhost', 'user', 'password', 'database')
def query_data():
with pool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users LIMIT 100")
return cursor.fetchall()
内存友好的数据处理
def process_large_dataset():
"""分批处理大数据集"""
batch_size = 1000
offset = 0
while True:
# 使用LIMIT和OFFSET分页查询
query = "SELECT id, name, email FROM users LIMIT %s OFFSET %s"
with pool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, (batch_size, offset))
results = cursor.fetchall()
if not results:
break
# 处理当前批次数据
for row in results:
process_row(row)
offset += batch_size
# 主动释放内存
import gc
gc.collect()
def process_with_generator():
"""使用生成器逐行处理"""
def data_generator():
with pool.get_connection() as conn:
with conn.cursor(pymysql.cursors.SSCursor) as cursor: # 服务端游标
cursor.execute("SELECT * FROM large_table")
while True:
row = cursor.fetchone()
if row is None:
break
yield row
for row in data_generator():
process_single_row(row)
3. 查询优化
索引优化
-- 创建合适的索引
CREATE INDEX idx_user_status ON users(status);
CREATE INDEX idx_user_created ON users(created_at);
CREATE INDEX idx_user_email ON users(email);
-- 复合索引
CREATE INDEX idx_user_status_created ON users(status, created_at);
优化的查询语句
def optimized_queries():
# 1. 只选择需要的字段
cursor.execute("SELECT id, name, email FROM users WHERE status = %s", ['active'])
# 2. 使用EXISTS代替IN
cursor.execute("""
SELECT u.id, u.name
FROM users u
WHERE EXISTS (
SELECT 1 FROM orders o
WHERE o.user_id = u.id AND o.status = 'completed'
)
""")
# 3. 避免SELECT *
cursor.execute("SELECT COUNT(*) FROM users WHERE age > %s", [18])
# 4. 使用预编译语句
stmt = "INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)"
cursor.executemany(stmt, batch_data)
4. 缓存策略
Redis缓存实现
import redis
import json
from functools import wraps
class CacheManager:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def cache_result(self, ttl=300): # 默认5分钟
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行函数并缓存结果
result = func(*args, **kwargs)
self.redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# 使用示例
cache_manager = CacheManager()
@cache_manager.cache_result(ttl=600)
def get_user_profile(user_id):
with pool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", [user_id])
return cursor.fetchone()
5. 内存监控和垃圾回收
内存监控工具
import psutil
import os
import logging
from functools import wraps
class MemoryMonitor:
def __init__(self, threshold_mb=1500):
self.threshold_mb = threshold_mb
self.process = psutil.Process(os.getpid())
def check_memory_usage(self):
memory_mb = self.process.memory_info().rss / 1024 / 1024
if memory_mb > self.threshold_mb:
logging.warning(f"Memory usage high: {memory_mb:.2f}MB")
return True
return False
def memory_guard(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if self.check_memory_usage():
import gc
gc.collect()
# 可以考虑降级处理或返回错误
return func(*args, **kwargs)
return wrapper
# 应用内存监控
monitor = MemoryMonitor()
@monitor.memory_guard
def heavy_operation():
# 你的业务逻辑
pass
6. 异步处理优化
使用asyncio和aiomysql
import asyncio
import aiomysql
class AsyncDatabase:
def __init__(self, host, user, password, database):
self.host = host
self.user = user
self.password = password
self.database = database
self.pool = None
async def init_pool(self):
self.pool = await aiomysql.create_pool(
host=self.host,
port=3306,
user=self.user,
password=self.password,
db=self.database,
minsize=1,
maxsize=5,
autocommit=True
)
async def execute_query(self, query, params=None):
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchall()
async def close(self):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
# 使用示例
async def main():
db = AsyncDatabase('localhost', 'user', 'password', 'database')
await db.init_pool()
try:
results = await db.execute_query("SELECT * FROM users LIMIT 100")
print(results)
finally:
await db.close()
# 运行异步代码
# asyncio.run(main())
7. 系统级优化建议
Linux系统优化
# 调整内核参数
echo 'vm.swappiness=10' >> /etc/sysctl.conf
echo 'vm.vfs_cache_pressure=50' >> /etc/sysctl.conf
# 优化文件描述符限制
ulimit -n 65536
# 清理不必要的服务
systemctl disable bluetooth
systemctl disable cups
定期维护脚本
import subprocess
import logging
from datetime import datetime
def cleanup_system():
"""定期清理系统"""
try:
# 清理MySQL二进制日志
subprocess.run([
"mysql", "-u", "root", "-p", "password",
"-e", "PURGE BINARY LOGS BEFORE DATE_SUB(NOW(), INTERVAL 7 DAY);"
])
# 优化表
subprocess.run([
"mysqlcheck", "-u", "root", "-p", "password",
"--optimize", "your_database"
])
logging.info(f"Cleanup completed at {datetime.now()}")
except Exception as e:
logging.error(f"Cleanup failed: {e}")
8. 监控和告警
基础监控脚本
import time
import smtplib
from email.mime.text import MIMEText
def monitor_resources():
"""监控资源使用情况"""
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
if memory_percent > 85 or cpu_percent > 80:
send_alert(f"High resource usage: CPU {cpu_percent}%, Memory {memory_percent}%")
def send_alert(message):
# 发送告警邮件
msg = MIMEText(message)
msg['Subject'] = 'Server Alert'
msg['From'] = 'alert@yourdomain.com'
msg['To'] = 'admin@yourdomain.com'
try:
server = smtplib.SMTP('localhost')
server.send_message(msg)
server.quit()
except:
pass # 忽略发送失败
通过以上综合优化策略,可以在2GB内存的服务器上有效运行Python + MySQL应用。关键是要:
- 合理配置MySQL内存参数
- 使用连接池管理数据库连接
- 采用分批处理大数据集
- 实施适当的缓存策略
- 监控内存使用并及时清理
- 优化查询语句和索引
CLOUD云枢