Time-series splitting
Time-Series Splitting trong Machine Learning

Giới thiệu
Khi làm việc với dữ liệu chuỗi thời gian (time series data), các phương pháp chia dữ liệu truyền thống như random splitting hay ngay cả stratified splitting thường không phù hợp. Lý do là dữ liệu chuỗi thời gian có tính tuần tự và phụ thuộc thời gian, vì vậy cần các phương pháp chia dữ liệu đặc biệt để đảm bảo tính toàn vẹn của mô hình dự báo.
Time-Series Splitting là một kỹ thuật chia dữ liệu được thiết kế riêng cho dữ liệu chuỗi thời gian, giúp đảm bảo:
Tính thời gian tuần tự được bảo toàn
Không có hiện tượng data leakage
Mô phỏng chính xác cách mô hình sẽ được sử dụng trong thực tế
Bài viết này sẽ giới thiệu các phương pháp time-series splitting phổ biến, cách triển khai, và các lưu ý quan trọng khi áp dụng.
Tại sao không thể sử dụng Random Splitting cho dữ liệu chuỗi thời gian?
Dữ liệu chuỗi thời gian có những đặc điểm riêng biệt khiến random splitting trở nên không phù hợp:
Tính tuần tự: Các quan sát trong chuỗi thời gian không độc lập, mà thường có mối tương quan với các quan sát trước đó.
Data leakage: Random splitting có thể dẫn đến sử dụng dữ liệu tương lai để dự đoán quá khứ, một hiện tượng gọi là "future leakage" hoặc "look-ahead bias".
Tính thay đổi theo thời gian: Dữ liệu chuỗi thời gian thường có tính không dừng (non-stationary), với các xu hướng và mùa vụ thay đổi theo thời gian.
Minh họa vấn đề của Random Splitting
Giả sử chúng ta có dữ liệu giá cổ phiếu trong 2 năm. Nếu sử dụng random splitting:
Có thể huấn luyện mô hình trên dữ liệu tháng 12/2023 và kiểm tra trên dữ liệu tháng 6/2023
Điều này vi phạm nguyên tắc cơ bản của dự báo: chúng ta không thể biết tương lai để dự đoán quá khứ
Các phương pháp Time-Series Splitting
1. Simple Forward Chaining (Train-Test Split dựa trên thời gian)
Đây là phương pháp đơn giản nhất: chọn một điểm cắt thời gian và sử dụng tất cả dữ liệu trước điểm đó làm tập huấn luyện, dữ liệu sau làm tập kiểm tra.
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
# Giả sử df là DataFrame có cột 'date' và các đặc trưng, mục tiêu
# Sắp xếp dữ liệu theo thời gian
df = df.sort_values('date')
# Chọn ngày cắt (ví dụ: 80% dữ liệu cho training)
cut_date = df['date'].iloc[int(len(df) * 0.8)]
# Chia dữ liệu
train_df = df[df['date'] <= cut_date]
test_df = df[df['date'] > cut_date]
# Tách features và target
X_train = train_df.drop(['date', 'target'], axis=1)
y_train = train_df['target']
X_test = test_df.drop(['date', 'target'], axis=1)
y_test = test_df['target']Ưu điểm:
Đơn giản, dễ hiểu
Mô phỏng chính xác quá trình dự báo thực tế
Nhược điểm:
Chỉ có một tập kiểm tra, không đánh giá được tính ổn định của mô hình
Không tận dụng tất cả dữ liệu cho việc đánh giá
2. Expanding Window (Growing Window)
Phương pháp này bắt đầu với một tập huấn luyện nhỏ và dần dần mở rộng cửa sổ huấn luyện, trong khi luôn sử dụng một khoảng thời gian cố định tiếp theo làm tập kiểm tra.
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
# Sắp xếp dữ liệu theo thời gian
df = df.sort_values('date')
# Thiết lập các tham số
initial_train_size = int(len(df) * 0.5) # Bắt đầu với 50% dữ liệu
horizon = 30 # Dự báo 30 ngày tiếp theo
step = 30 # Bước nhảy 30 ngày
# Chuẩn bị features và target
X = df.drop(['date', 'target'], axis=1)
y = df['target']
dates = df['date']
results = []
# Vòng lặp expanding window
for i in range(initial_train_size, len(df) - horizon, step):
# Tập huấn luyện từ đầu đến điểm hiện tại
train_indices = list(range(0, i))
# Tập kiểm tra là horizon tiếp theo
test_indices = list(range(i, i + horizon))
# Tách dữ liệu
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
# Huấn luyện mô hình (ví dụ RandomForest)
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)
# Dự đoán và đánh giá
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
results.append({
'train_start': dates.iloc[0],
'train_end': dates.iloc[i-1],
'test_start': dates.iloc[i],
'test_end': dates.iloc[i+horizon-1],
'train_size': len(train_indices),
'test_size': len(test_indices),
'rmse': rmse
})
# Kết quả đánh giá
results_df = pd.DataFrame(results)Ưu điểm:
Tận dụng tất cả dữ liệu lịch sử có sẵn tại mỗi thời điểm
Mô phỏng cách mô hình được cập nhật trong thực tế khi có thêm dữ liệu mới
Đánh giá hiệu suất của mô hình theo thời gian
Nhược điểm:
Các mô hình sau sử dụng nhiều dữ liệu hơn, khó so sánh công bằng
Có thể chậm nếu dữ liệu lớn và số lần fold nhiều
3. Sliding Window (Rolling Window)
Phương pháp này duy trì kích thước cửa sổ huấn luyện cố định, chỉ "trượt" cửa sổ theo thời gian.
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
# Sắp xếp dữ liệu theo thời gian
df = df.sort_values('date')
# Thiết lập các tham số
window_size = 365 # Cửa sổ huấn luyện 365 ngày
horizon = 30 # Dự báo 30 ngày tiếp theo
step = 30 # Bước trượt 30 ngày
# Chuẩn bị features và target
X = df.drop(['date', 'target'], axis=1)
y = df['target']
dates = df['date']
results = []
# Vòng lặp sliding window
for i in range(window_size, len(df) - horizon, step):
# Tập huấn luyện là cửa sổ có kích thước cố định
train_indices = list(range(i - window_size, i))
# Tập kiểm tra là horizon tiếp theo
test_indices = list(range(i, i + horizon))
# Tách dữ liệu
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
# Huấn luyện mô hình (ví dụ RandomForest)
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)
# Dự đoán và đánh giá
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
results.append({
'train_start': dates.iloc[i-window_size],
'train_end': dates.iloc[i-1],
'test_start': dates.iloc[i],
'test_end': dates.iloc[i+horizon-1],
'rmse': rmse
})
# Kết quả đánh giá
results_df = pd.DataFrame(results)Ưu điểm:
Các mô hình sử dụng cùng lượng dữ liệu huấn luyện, tạo sự đồng nhất
Giảm ảnh hưởng của dữ liệu quá cũ, tập trung vào xu hướng gần đây
Hiệu quả khi dữ liệu không dừng (non-stationary)
Nhược điểm:
Không sử dụng tất cả dữ liệu lịch sử có sẵn
Có thể bỏ qua các pattern dài hạn
4. Time Series Split trong Scikit-learn
Scikit-learn cung cấp lớp TimeSeriesSplit để thực hiện cross-validation cho dữ liệu chuỗi thời gian:
from sklearn.model_selection import TimeSeriesSplit
import numpy as np
import matplotlib.pyplot as plt
# Thiết lập TimeSeriesSplit với 5 folds
tscv = TimeSeriesSplit(n_splits=5)
# Minh họa cách chia dữ liệu
X = np.array(range(100)) # Ví dụ đơn giản với 100 điểm dữ liệu
fig, ax = plt.subplots(figsize=(10, 8))
for i, (train_index, test_index) in enumerate(tscv.split(X)):
# Tô màu cho train set
ax.scatter(train_index, [i+0.1]*len(train_index), c='blue', marker='o', s=10, label='Train' if i == 0 else "")
# Tô màu cho test set
ax.scatter(test_index, [i+0.1]*len(test_index), c='red', marker='o', s=10, label='Test' if i == 0 else "")
ax.set_xlabel('Sample Index')
ax.set_ylabel('CV Iteration')
ax.set_title('Time Series Cross Validation')
ax.legend()
plt.tight_layout()
plt.show()
# Sử dụng TimeSeriesSplit để train và đánh giá mô hình
from sklearn.metrics import mean_squared_error
scores = []
for train_index, test_index in tscv.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
model = RandomForestRegressor(random_state=42)
model.fit(X_train.reshape(-1, 1), y_train)
y_pred = model.predict(X_test.reshape(-1, 1))
mse = mean_squared_error(y_test, y_pred)
scores.append(mse)
print(f"Mean MSE: {np.mean(scores)}")
print(f"MSE per fold: {scores}")Ưu điểm:
Tích hợp sẵn trong scikit-learn
Dễ kết hợp với GridSearchCV và RandomizedSearchCV cho hyperparameter tuning
Tự động triển khai expanding window
Nhược điểm:
Không cung cấp tùy chọn cho sliding window
Hạn chế về tùy chỉnh kích thước test set
5. Multiple Temporal Splits (Blocked Time Series Split)
Phương pháp này kết hợp cross-validation với time series splitting bằng cách chia dữ liệu thành nhiều khối (blocks) không chồng chéo.
from sklearn.model_selection import KFold
import numpy as np
class BlockTimeSeriesSplit:
def __init__(self, n_splits=5):
self.n_splits = n_splits
def split(self, X, y=None, groups=None):
n_samples = len(X)
# Kích thước mỗi khối
block_size = n_samples // self.n_splits
indices = np.arange(n_samples)
for i in range(self.n_splits - 1):
# Xác định block hiện tại làm test set
test_start = (i + 1) * block_size
test_end = (i + 2) * block_size
# Tất cả dữ liệu trước block hiện tại làm train set
train_indices = indices[:test_start]
test_indices = indices[test_start:test_end]
yield train_indices, test_indices
# Sử dụng
bts = BlockTimeSeriesSplit(n_splits=5)
for train_idx, test_idx in bts.split(X):
print(f"Train size: {len(train_idx)}, Test size: {len(test_idx)}")Ưu điểm:
Tất cả các fold đều độc lập
Đơn giản để hiểu và triển khai
Nhược điểm:
Không phản ánh chính xác cách mô hình được sử dụng trong thực tế
Các fold sau có thể ít dữ liệu huấn luyện hơn
6. Cross-validation với Purging và Embargo
Khi làm việc với dữ liệu tài chính, thường cần xử lý vấn đề chồng chéo (overlapping) và nhiễu thị trường. Hai kỹ thuật quan trọng là:
Purging: Xóa bỏ các mẫu có cùng thời gian hoặc nhãn với dữ liệu test
Embargo: Xóa bỏ một số mẫu liền kề trước và sau khoảng test để giảm nhiễu
import numpy as np
import pandas as pd
def purged_embargo_split(X, y, embargo_size=5):
"""
Chia dữ liệu với kỹ thuật purging và embargo
Parameters:
X: DataFrame với index là timestamp
y: Series với index là timestamp
embargo_size: Số mẫu embargo trước và sau test set
Returns:
Generator trả về train_indices và test_indices
"""
# Sắp xếp theo thời gian
if not isinstance(X.index, pd.DatetimeIndex):
raise ValueError("X.index phải là DatetimeIndex")
# Thời gian của mỗi mẫu
timestamps = X.index.tolist()
# Kích thước của test set
test_size = len(X) // 5 # 20% dữ liệu
for i in range(0, len(X), test_size):
# Nếu không đủ dữ liệu cho test set cuối cùng
if i + test_size > len(X):
break
# Test indices
test_indices = list(range(i, i + test_size))
# Train indices (tất cả ngoại trừ test và embargo)
all_indices = set(range(len(X)))
# Embargo indices (các mẫu trước và sau test set)
embargo_before = set(range(max(0, i - embargo_size), i))
embargo_after = set(range(i + test_size, min(i + test_size + embargo_size, len(X))))
# Loại bỏ test và embargo khỏi train
train_indices = list(all_indices - set(test_indices) - embargo_before - embargo_after)
yield train_indices, test_indicesƯu điểm:
Giảm thiểu data leakage và nhiễu thị trường
Đặc biệt hữu ích cho dữ liệu tài chính
Nhược điểm:
Giảm kích thước dữ liệu huấn luyện
Cần hiểu rõ về cấu trúc dữ liệu và mối tương quan thời gian
Hướng dẫn chọn phương pháp Time-Series Splitting phù hợp
Lựa chọn phương pháp time-series splitting phù hợp phụ thuộc vào nhiều yếu tố:
Khối lượng dữ liệu:
Dữ liệu ít: Expanding window để tận dụng tối đa dữ liệu
Dữ liệu nhiều: Sliding window để tập trung vào xu hướng gần đây
Tính dừng của dữ liệu:
Dữ liệu dừng (stationary): Expanding window
Dữ liệu không dừng (non-stationary): Sliding window
Mục đích dự báo:
Dự báo ngắn hạn: Sliding window với cửa sổ nhỏ
Dự báo dài hạn: Expanding window để nắm bắt xu hướng dài hạn
Chu kỳ dữ liệu:
Nếu dữ liệu có tính chu kỳ (ví dụ: theo mùa), kích thước cửa sổ sliding window nên lớn hơn chu kỳ
Loại nghiệp vụ:
Tài chính: Xem xét Purging và Embargo
Dự báo nhu cầu: Multiple Temporal Splits có thể hiệu quả
Tài nguyên tính toán:
Hạn chế: Simple Forward Chaining hoặc TimeSeriesSplit với số fold nhỏ
Dồi dào: Sliding Window hoặc Expanding Window với nhiều bước nhỏ
Hiện tượng Concept Drift và Time-Series Splitting
Concept Drift là hiện tượng mối quan hệ giữa biến đầu vào và biến mục tiêu thay đổi theo thời gian. Đây là một thách thức lớn trong machine learning với dữ liệu chuỗi thời gian.
Các phương pháp Time-Series Splitting có thể giúp phát hiện và xử lý concept drift:
Phát hiện Concept Drift:
import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.metrics import mean_squared_error # Đánh giá hiệu suất mô hình theo thời gian def evaluate_performance_over_time(X, y, window_size=30, horizon=7, step=7): dates = X.index # Giả sử X có index là timestamps errors = [] timestamps = [] for i in range(window_size, len(X) - horizon, step): # Chia dữ liệu theo sliding window train_indices = list(range(i - window_size, i)) test_indices = list(range(i, i + horizon)) X_train, X_test = X.iloc[train_indices], X.iloc[test_indices] y_train, y_test = y.iloc[train_indices], y.iloc[test_indices] # Huấn luyện và dự đoán model = YourModel() model.fit(X_train, y_train) y_pred = model.predict(X_test) # Tính lỗi mse = mean_squared_error(y_test, y_pred) errors.append(mse) timestamps.append(dates[i]) # Vẽ biểu đồ lỗi theo thời gian plt.figure(figsize=(12, 6)) plt.plot(timestamps, errors) plt.title('Model Error Over Time') plt.xlabel('Date') plt.ylabel('MSE') plt.grid(True) # Phát hiện concept drift # Sự tăng đột biến lỗi có thể chỉ ra concept drift error_diff = np.diff(errors) significant_increase = error_diff > (np.mean(error_diff) + 2 * np.std(error_diff)) for i, is_increase in enumerate(significant_increase): if is_increase: plt.axvline(x=timestamps[i+1], color='r', linestyle='--') plt.show() return timestamps, errors, significant_increaseXử lý Concept Drift với Adaptive Time-Series Splitting:
class AdaptiveTimeSeriesSplit: def __init__(self, initial_window=60, min_window=30, error_threshold=0.1): self.initial_window = initial_window self.min_window = min_window self.error_threshold = error_threshold def split(self, X, y): n_samples = len(X) current_window = self.initial_window start_idx = 0 while start_idx + current_window < n_samples - 1: # Tìm điểm split tối ưu errors = [] for split_point in range(start_idx + self.min_window, start_idx + current_window): # Chia train-test train = list(range(start_idx, split_point)) test = [split_point] X_train, X_test = X.iloc[train], X.iloc[test] y_train, y_test = y.iloc[train], y.iloc[test] # Train model và đánh giá model = YourModel() model.fit(X_train, y_train) y_pred = model.predict(X_test) # Tính lỗi error = abs(y_test.values[0] - y_pred[0]) / (y_test.values[0] + 1e-10) errors.append((split_point, error)) # Tìm điểm có lỗi thấp nhất split_points, split_errors = zip(*errors) best_idx = np.argmin(split_errors) best_split = split_points[best_idx] best_error = split_errors[best_idx] # Nếu lỗi vượt ngưỡng, giảm cửa sổ if best_error > self.error_threshold: current_window = max(self.min_window, current_window // 2) # Trả về train-test indices train_indices = list(range(start_idx, best_split)) test_indices = [best_split] yield train_indices, test_indices # Di chuyển cửa sổ start_idx = best_split + 1
Đánh giá mô hình với Time-Series Splitting
Khi đánh giá mô hình dự báo chuỗi thời gian, cần xem xét nhiều metric khác nhau:
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
import numpy as np
def evaluate_time_series_model(y_true, y_pred):
"""Đánh giá mô hình với nhiều metric"""
results = {}
# Lỗi tuyệt đối trung bình
results['MAE'] = mean_absolute_error(y_true, y_pred)
# Lỗi bình phương trung bình
results['MSE'] = mean_squared_error(y_true, y_pred)
# Căn bậc hai của MSE
results['RMSE'] = np.sqrt(results['MSE'])
# Lỗi phần trăm tuyệt đối trung bình
# Tránh chia cho 0
if not np.any(y_true == 0):
results['MAPE'] = mean_absolute_percentage_error(y_true, y_pred) * 100
else:
# Thay thế 0 bằng giá trị rất nhỏ
y_true_safe = np.where(y_true == 0, 1e-10, y_true)
results['MAPE'] = mean_absolute_percentage_error(y_true_safe, y_pred) * 100
# Thêm metric cho hướng dự báo
direction_correct = np.sign(np.diff(y_true)) == np.sign(np.diff(y_pred))
results['Direction Accuracy'] = np.mean(direction_correct) * 100
return results
# Đánh giá mô hình trên nhiều cửa sổ thời gian
def evaluate_across_time_windows(X, y, model, method='expanding', window_size=60, step=30, horizon=7):
"""Đánh giá mô hình trên nhiều cửa sổ thời gian"""
dates = X.index # Giả sử X có index là timestamps
all_metrics = []
if method == 'expanding':
# Expanding window
for i in range(window_size, len(X) - horizon, step):
train_indices = list(range(0, i))
test_indices = list(range(i, i + horizon))
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = evaluate_time_series_model(y_test, y_pred)
metrics['train_start'] = dates[0]
metrics['train_end'] = dates[i-1]
metrics['test_start'] = dates[i]
metrics['test_end'] = dates[i+horizon-1]
all_metrics.append(metrics)
elif method == 'sliding':
# Sliding window
for i in range(window_size, len(X) - horizon, step):
train_indices = list(range(i - window_size, i))
test_indices = list(range(i, i + horizon))
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = evaluate_time_series_model(y_test, y_pred)
metrics['train_start'] = dates[i-window_size]
metrics['train_end'] = dates[i-1]
metrics['test_start'] = dates[i]
metrics['test_end'] = dates[i+horizon-1]
all_metrics.append(metrics)
return pd.DataFrame(all_metrics)
# Sử dụng hàm đánh giá
metrics_df = evaluate_across_time_windows(X, y, model, method='sliding')Ứng dụng Time-Series Splitting trong các lĩnh vực khác nhau
1. Dự báo tài chính
Trong tài chính, dữ liệu thường có nhiễu cao và xu hướng thị trường có thể thay đổi đột ngột. Sliding window thường được ưa chuộng hơn vì nó tập trung vào thông tin gần đây.
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_percentage_error
import matplotlib.pyplot as plt
# Ví dụ với dữ liệu chứng khoán
stock_data = pd.read_csv('stock_data.csv', parse_dates=['date'], index_col='date')
# Feature engineering
stock_data['return_1d'] = stock_data['close'].pct_change(1)
stock_data['return_5d'] = stock_data['close'].pct_change(5)
stock_data['volume_change'] = stock_data['volume'].pct_change(1)
stock_data['ma_10'] = stock_data['close'].rolling(10).mean()
stock_data['ma_30'] = stock_data['close'].rolling(30).mean()
stock_data['volatility'] = stock_data['return_1d'].rolling(20).std()
# Tạo target: giá sau 5 ngày
stock_data['target'] = stock_data['close'].shift(-5)
# Loại bỏ NaN
stock_data = stock_data.dropna()
# Định nghĩa features và target
features = ['return_1d', 'return_5d', 'volume_change', 'ma_10', 'ma_30', 'volatility']
X = stock_data[features]
y = stock_data['target']
# Thiết lập tham số
window_size = 252 # ~1 năm giao dịch
horizon = 5 # Dự báo 5 ngày
step = 20 # Trượt 20 ngày 1 lần
# Chuẩn bị danh sách kết quả
results = []
# Sliding window backtesting
for i in range(window_size, len(X) - horizon, step):
# Tập huấn luyện là cửa sổ có kích thước cố định
train_indices = list(range(i - window_size, i))
# Tập kiểm tra là horizon tiếp theo
test_indices = list(range(i, i + horizon))
# Tách dữ liệu
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
# Huấn luyện mô hình
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Dự đoán và đánh giá
y_pred = model.predict(X_test)
mape = mean_absolute_percentage_error(y_test, y_pred) * 100
# Lưu kết quả
test_date = X.index[i]
results.append({
'test_date': test_date,
'actual': y_test.mean(),
'predicted': np.mean(y_pred),
'mape': mape
})
# Chuyển kết quả thành DataFrame
results_df = pd.DataFrame(results)
# Vẽ biểu đồ MAPE theo thời gian
plt.figure(figsize=(12, 6))
plt.plot(results_df['test_date'], results_df['mape'])
plt.axhline(y=results_df['mape'].mean(), color='r', linestyle='--', label=f'Mean MAPE: {results_df["mape"].mean():.2f}%')
plt.title('Prediction Error Over Time')
plt.xlabel('Date')
plt.ylabel('MAPE (%)')
plt.legend()
plt.grid(True)
plt.show()2. Dự báo nhu cầu (Demand Forecasting)
Dự báo nhu cầu thường có tính chu kỳ mạnh (ngày, tuần, tháng, năm), nên cần sử dụng cửa sổ huấn luyện đủ lớn để bao quát toàn bộ chu kỳ.
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error
import matplotlib.pyplot as plt
# Đọc dữ liệu bán hàng
sales_data = pd.read_csv('sales_data.csv', parse_dates=['date'], index_col='date')
# Feature engineering cho dữ liệu bán hàng
sales_data['day_of_week'] = sales_data.index.dayofweek
sales_data['month'] = sales_data.index.month
sales_data['quarter'] = sales_data.index.quarter
sales_data['year'] = sales_data.index.year
sales_data['is_weekend'] = (sales_data['day_of_week'] >= 5).astype(int)
sales_data['sales_lag_1w'] = sales_data['sales'].shift(7)
sales_data['sales_lag_2w'] = sales_data['sales'].shift(14)
sales_data['sales_lag_1m'] = sales_data['sales'].shift(30)
sales_data['sales_lag_1y'] = sales_data['sales'].shift(365)
sales_data['sales_rolling_7d'] = sales_data['sales'].rolling(7).mean()
sales_data['sales_rolling_30d'] = sales_data['sales'].rolling(30).mean()
# Loại bỏ NaN
sales_data = sales_data.dropna()
# Định nghĩa features và target
features = ['day_of_week', 'month', 'quarter', 'year', 'is_weekend',
'sales_lag_1w', 'sales_lag_2w', 'sales_lag_1m', 'sales_lag_1y',
'sales_rolling_7d', 'sales_rolling_30d']
X = sales_data[features]
y = sales_data['sales']
# Thiết lập tham số cho expanding window
initial_window = 365*2 # 2 năm dữ liệu ban đầu
horizon = 30 # Dự báo 30 ngày
step = 30 # Bước nhảy 30 ngày
# Expanding window evaluation
results = []
for i in range(initial_window, len(X) - horizon, step):
# Tập huấn luyện mở rộng dần
train_indices = list(range(0, i))
# Tập kiểm tra là horizon tiếp theo
test_indices = list(range(i, i + horizon))
# Tách dữ liệu
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
# Huấn luyện mô hình
model = GradientBoostingRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Dự đoán và đánh giá
y_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
# Lưu kết quả
test_date = X.index[i]
results.append({
'test_date': test_date,
'train_size': len(train_indices),
'mae': mae,
'relative_mae': mae / y_test.mean()
})
# Phân tích kết quả
results_df = pd.DataFrame(results)
print(f"Average MAE: {results_df['mae'].mean():.2f}")
print(f"Average relative MAE: {results_df['relative_mae'].mean()*100:.2f}%")
# Vẽ biểu đồ lỗi và kích thước tập huấn luyện
fig, ax1 = plt.subplots(figsize=(12, 6))
# Trục cho MAE
color = 'tab:blue'
ax1.set_xlabel('Test Date')
ax1.set_ylabel('MAE', color=color)
ax1.plot(results_df['test_date'], results_df['mae'], color=color)
ax1.tick_params(axis='y', labelcolor=color)
# Trục thứ hai cho kích thước tập huấn luyện
ax2 = ax1.twinx()
color = 'tab:red'
ax2.set_ylabel('Train Size', color=color)
ax2.plot(results_df['test_date'], results_df['train_size'], color=color, linestyle='--')
ax2.tick_params(axis='y', labelcolor=color)
plt.title('MAE and Training Size over Time (Expanding Window)')
fig.tight_layout()
plt.show()3. Dữ liệu cảm biến IoT (IoT Sensor Data)
Dữ liệu cảm biến IoT thường có độ phân giải cao và concept drift do thay đổi môi trường, nên sliding window với cửa sổ nhỏ thường hiệu quả hơn.
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
# Đọc dữ liệu cảm biến
sensor_data = pd.read_csv('sensor_data.csv', parse_dates=['timestamp'], index_col='timestamp')
# Feature engineering
sensor_data['hour'] = sensor_data.index.hour
sensor_data['day_of_week'] = sensor_data.index.dayofweek
sensor_data['temp_lag_1h'] = sensor_data['temperature'].shift(1)
sensor_data['temp_lag_3h'] = sensor_data['temperature'].shift(3)
sensor_data['temp_lag_6h'] = sensor_data['temperature'].shift(6)
sensor_data['temp_diff_1h'] = sensor_data['temperature'].diff(1)
sensor_data['temp_rolling_3h'] = sensor_data['temperature'].rolling(3).mean()
sensor_data['humidity_lag_1h'] = sensor_data['humidity'].shift(1)
sensor_data['pressure_lag_1h'] = sensor_data['pressure'].shift(1)
# Loại bỏ NaN
sensor_data = sensor_data.dropna()
# Định nghĩa features và target
features = ['hour', 'day_of_week', 'temp_lag_1h', 'temp_lag_3h', 'temp_lag_6h',
'temp_diff_1h', 'temp_rolling_3h', 'humidity_lag_1h', 'pressure_lag_1h']
X = sensor_data[features]
y = sensor_data['temperature']
# Thiết lập tham số cho sliding window
window_size = 24*7 # 1 tuần dữ liệu
horizon = 6 # Dự báo 6 giờ tiếp theo
step = 6 # Trượt 6 giờ một lần
# Sliding window evaluation với pipeline
results = []
for i in range(window_size, len(X) - horizon, step):
# Tập huấn luyện là cửa sổ trượt
train_indices = list(range(i - window_size, i))
# Tập kiểm tra là horizon tiếp theo
test_indices = list(range(i, i + horizon))
# Tách dữ liệu
X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]
# Tạo và huấn luyện pipeline (scaling + model)
pipe = Pipeline([
('scaler', StandardScaler()),
('ridge', Ridge(alpha=1.0))
])
pipe.fit(X_train, y_train)
# Dự đoán và đánh giá
y_pred = pipe.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
# Lưu kết quả
test_timestamp = X.index[i]
results.append({
'test_timestamp': test_timestamp,
'rmse': rmse
})
# Chuyển kết quả thành DataFrame
results_df = pd.DataFrame(results)
# Vẽ biểu đồ RMSE theo thời gian
plt.figure(figsize=(12, 6))
plt.plot(results_df['test_timestamp'], results_df['rmse'])
plt.title('Prediction Error (RMSE) Over Time for IoT Sensor Data')
plt.xlabel('Timestamp')
plt.ylabel('RMSE (°C)')
plt.grid(True)
# Phát hiện concept drift
# Tính ngưỡng cho RMSE bất thường (ví dụ: trung bình + 2*độ lệch chuẩn)
rmse_threshold = results_df['rmse'].mean() + 2 * results_df['rmse'].std()
drift_points = results_df[results_df['rmse'] > rmse_threshold]
# Đánh dấu các điểm có concept drift
plt.scatter(drift_points['test_timestamp'], drift_points['rmse'], color='red',
label=f'Potential Concept Drift ({len(drift_points)} points)')
plt.axhline(y=rmse_threshold, color='r', linestyle='--',
label=f'Drift Threshold: {rmse_threshold:.4f}')
plt.legend()
plt.show()Các thư viện hỗ trợ Time-Series Splitting
1. Scikit-learn
from sklearn.model_selection import TimeSeriesSplit2. Cross-Validation cho Time Series với Sktime
Sktime là một thư viện Python chuyên cho dữ liệu chuỗi thời gian, cung cấp nhiều công cụ cho time-series splitting.
# Cài đặt: pip install sktime
from sktime.forecasting.model_selection import ExpandingWindowSplitter, SlidingWindowSplitter
import numpy as np
# Tạo dữ liệu ví dụ
np.random.seed(42)
time_index = pd.date_range(start='2020-01-01', periods=100, freq='D')
y = pd.Series(np.random.randn(100).cumsum(), index=time_index)
# Expanding window splitter
expanding_splitter = ExpandingWindowSplitter(
initial_window=30, # Kích thước cửa sổ ban đầu
step_length=10, # Mỗi lần tăng 10 điểm dữ liệu
fh=np.arange(1, 8) # Dự báo 7 ngày tiếp theo
)
# Sliding window splitter
sliding_splitter = SlidingWindowSplitter(
window_length=30, # Kích thước cửa sổ cố định
step_length=10, # Trượt 10 điểm mỗi lần
fh=np.arange(1, 8) # Dự báo 7 ngày tiếp theo
)
# Minh họa các cửa sổ
for train, test in expanding_splitter.split(y):
print(f"Expanding: Train size: {len(train)}, Test indices: {test}")
for train, test in sliding_splitter.split(y):
print(f"Sliding: Train size: {len(train)}, Test indices: {test}")3. Các công cụ nâng cao từ Tslearn
Tslearn cung cấp các công cụ cho phân tích chuỗi thời gian.
# Cài đặt: pip install tslearn
from tslearn.utils import to_time_series_dataset
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.clustering import TimeSeriesKMeans
import numpy as np
# Tạo dữ liệu mẫu cho time series clustering
np.random.seed(42)
n_series = 50
n_timestamps = 100
dataset = to_time_series_dataset([np.random.randn(n_timestamps) for _ in range(n_series)])
# Chuẩn hóa dữ liệu
scaler = TimeSeriesScalerMeanVariance()
dataset_scaled = scaler.fit_transform(dataset)
# Áp dụng K-means clustering
km = TimeSeriesKMeans(n_clusters=3, metric="dtw", random_state=42)
labels = km.fit_predict(dataset_scaled)
# Đếm số lượng time series trong mỗi cluster
for i in range(3):
print(f"Cluster {i}: {np.sum(labels == i)} time series")Kết luận
Time-Series Splitting là một kỹ thuật quan trọng trong quá trình đánh giá và huấn luyện mô hình dự báo chuỗi thời gian. Khác với các phương pháp chia dữ liệu truyền thống, nó đảm bảo tính toàn vẹn thời gian và mô phỏng chính xác cách mô hình sẽ được sử dụng trong thực tế.
Các phương pháp chính:
Simple Forward Chaining: Đơn giản, phù hợp cho đánh giá nhanh
Expanding Window: Tận dụng tất cả dữ liệu lịch sử có sẵn
Sliding Window: Tập trung vào xu hướng gần đây, xử lý concept drift
TimeSeriesSplit từ scikit-learn: Tiện lợi, tích hợp sẵn với các công cụ khác
Các phương pháp nâng cao: Purging, Embargo, Adaptive Splitting
Khi làm việc với dữ liệu chuỗi thời gian, việc lựa chọn phương pháp splitting phù hợp là bước quan trọng để xây dựng mô hình dự báo đáng tin cậy và có khả năng tổng quát hóa tốt trong thực tế.
Last updated