Prétraitement
1. Chargement des Données
1import pandas as pd
2
3# Chargement du dataset Bitcoin complet
4data = pd.read_csv('/content/drive/MyDrive/timeSerie/Final_Bitcoin_dataset.csv', parse_dates=['Date'])
5
6# Conversion des dates et mise en place comme index
7data.set_index('Date', inplace=True)
8
9print(f"Dataset chargé avec {data.shape[0]} observations et {data.shape[1]} caractéristiques")
10print(data.head())
2. Feature Engineering
Calcul des indicateurs techniques
1# Volatilité intra-journée
2data['Volatility'] = (data['High'] - data['Low']) / data['Low'] * 100
3
4# Moving Average Convergence Divergence (MACD)
5data['MACD'] = data['Close'].ewm(span=12).mean() - data['Close'].ewm(span=26).mean()
6
7# Création de décalages temporels pour l'indice Fear & Greed
8for lag in [1, 3, 7, 15, 30, 60]:
9 data[f'fear_greed_lag_{lag}'] = data['fear_greed'].shift(lag)
Analyse de corrélation des décalages
1for lag in [1, 3, 7, 15, 30, 60]:
2 correlation = data[['Close', f'fear_greed_lag_{lag}']].corr().iloc[0,1]
3 print(f"Corrélation avec un décalage de {lag} jours: {correlation:.4f}")
3. Analyse Exploratoire
Analyse Univariée
1import matplotlib.pyplot as plt
2import seaborn as sns
3
4# Distribution des prix de clôture
5plt.figure(figsize=(12, 6))
6sns.histplot(data['Close'], kde=True)
7plt.title('Distribution des Prix de Clôture Bitcoin')
8plt.xlabel('Prix (USD)')
9plt.ylabel('Fréquence')
10plt.show()
11
12# Analyse des séries temporelles
13fig, ax = plt.subplots(3, 1, figsize=(12, 10))
14data['Close'].plot(ax=ax[0], title='Prix de Clôture Bitcoin')
15data['Volatility'].plot(ax=ax[1], title='Volatilité Intra-journée')
16data['MACD'].plot(ax=ax[2], title='Indicateur MACD')
17plt.tight_layout()
Analyse Bivariée
1# Relation entre prix et volume
2plt.figure(figsize=(10, 6))
3sns.scatterplot(x='Volume', y='Close', data=data)
4plt.title('Relation Prix/Volume')
5plt.show()
6
7# Matrice de corrélation
8corr_matrix = data.corr()
9plt.figure(figsize=(14, 10))
10sns.heatmap(corr_matrix, annot=True, fmt=".2f", cmap='coolwarm')
11plt.title('Matrice de Corrélation')
12plt.show()
Tests Statistiques (KPSS, ACF, PACF)
1from statsmodels.tsa.stattools import adfuller, kpss
2from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
3
4# Test ADF pour la stationnarité
5adf_result = adfuller(data['Close'])
6print(f'ADF Statistic: {adf_result[0]}')
7print(f'p-value: {adf_result[1]}')
8
9# Test KPSS
10kpss_result = kpss(data['Close'])
11print(f'KPSS Statistic: {kpss_result[0]}')
12print(f'p-value: {kpss_result[1]}')
13
14# ACF et PACF
15fig, ax = plt.subplots(2, 1, figsize=(12, 8))
16plot_acf(data['Close'], lags=30, ax=ax[0])
17plot_pacf(data['Close'], lags=30, ax=ax[1])
18plt.tight_layout()
4. Pipeline de Prétraitement
Classe de Création de Séquences
1import numpy as np
2from sklearn.base import BaseEstimator, TransformerMixin
3
4class SequenceTransformer(BaseEstimator, TransformerMixin):
5 """Transforme les données en séquences temporelles pour les modèles RNN"""
6
7 def __init__(self, seq_length=60, target_idx=0):
8 self.seq_length = seq_length
9 self.target_idx = target_idx
10
11 def fit(self, X, y=None):
12 return self
13
14 def transform(self, X):
15 X_seq, y_seq = [], []
16 for i in range(len(X) - self.seq_length):
17 X_seq.append(X[i:i+self.seq_length])
18 y_seq.append(X[i+self.seq_length, self.target_idx])
19 return np.array(X_seq), np.array(y_seq)
Pipeline Complet
1from sklearn.preprocessing import RobustScaler
2from sklearn.pipeline import Pipeline
3
4def create_pipeline(target_idx=0, seq_length=60):
5 """Crée le pipeline de prétraitement complet"""
6 return Pipeline([
7 ('scaler', RobustScaler()), # Normalisation robuste aux outliers
8 ('sequencer', SequenceTransformer(seq_length=seq_length, target_idx=target_idx))
9 ])
5. Architecture du Modèle
1from tensorflow.keras.models import Sequential
2from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Conv1D, MaxPooling1D, Bidirectional
3
4def build_advanced_model(input_shape, model_type='LSTM', units=64, n_layers=2, dropout=0.3):
5 """Construit une architecture de modèle avancée pour séries temporelles"""
6 model = Sequential()
7
8 # Couche d'entrée
9 if model_type == 'CNN-LSTM':
10 model.add(Conv1D(filters=units, kernel_size=3, activation='relu',
11 padding='causal', input_shape=input_shape))
12 model.add(MaxPooling1D(2))
13 model.add(LSTM(units//2, return_sequences=(n_layers>1)))
14 elif model_type == 'BiLSTM':
15 model.add(Bidirectional(LSTM(units, return_sequences=(n_layers>1)),
16 input_shape=input_shape))
17 else: # LSTM standard
18 model.add(LSTM(units, return_sequences=(n_layers>1), input_shape=input_shape))
19
20 # Couches cachées
21 for i in range(n_layers - 1):
22 model.add(LSTM(units, return_sequences=(i < n_layers-2)))
23 model.add(Dropout(dropout))
24
25 # Couches de sortie
26 model.add(Dense(64, activation='relu'))
27 model.add(Dense(1)) # Sortie pour la prédiction de prix
28
29 return model
6. Optimisation des Hyperparamètres avec Optuna
1import optuna
2import tensorflow as tf
3
4def objective(trial):
5 """Fonction d'objectif pour l'optimisation Optuna"""
6 # Espace de recherche des hyperparamètres
7 params = {
8 'units': trial.suggest_int('units', 32, 256),
9 'n_layers': trial.suggest_int('n_layers', 1, 4),
10 'dropout': trial.suggest_float('dropout', 0.1, 0.5),
11 'model_type': trial.suggest_categorical('model_type', ['LSTM', 'BiLSTM', 'CNN-LSTM']),
12 'lr': trial.suggest_float('lr', 1e-5, 1e-3, log=True)
13 }
14
15 lr = params.pop('lr') # Taux d'apprentissage séparé
16
17 # Construction du modèle
18 model = build_advanced_model(
19 input_shape=(SEQ_LENGTH, X_train.shape[2]),
20 **params
21 )
22
23 # Compilation
24 model.compile(
25 optimizer=tf.keras.optimizers.Adam(lr),
26 loss=tf.keras.losses.Huber(), # Perte robuste aux outliers
27 metrics=['mae'] # Erreur Moyenne Absolue
28 )
29
30 # Callbacks pour améliorer la formation
31 callbacks = [
32 EarlyStopping(patience=15, restore_best_weights=True),
33 ReduceLROnPlateau(factor=0.5, patience=5) # Réduction LR sur plateau
34 ]
35
36 # Entraînement
37 history = model.fit(
38 X_train, y_train,
39 validation_data=(X_val, y_val),
40 epochs=100,
41 batch_size=64,
42 verbose=0,
43 callbacks=callbacks
44 )
45
46 return min(history.history['val_mae']) # Minimiser la MAE de validation
7. Workflow Principal
1# Paramètres globaux
2SEQ_LENGTH = 60 # Longueur des séquences temporelles (2 mois)
3TARGET_COL = 'Close' # Variable cible
4
5# Création du pipeline
6target_idx = data.columns.get_loc(TARGET_COL)
7pipeline = create_pipeline(target_idx=target_idx, seq_length=SEQ_LENGTH)
8
9# Transformation des données
10X_seq, y_seq = pipeline.fit_transform(data.values)
11
12# Split temporel avec TimeSeriesSplit
13tscv = TimeSeriesSplit(n_splits=5)
14train_idx, val_idx = next(tscv.split(X_seq))
15X_train, X_val = X_seq[train_idx], X_seq[val_idx]
16y_train, y_val = y_seq[train_idx], y_seq[val_idx]
17
18# Optimisation Optuna
19study = optuna.create_study(direction='minimize')
20study.optimize(objective, n_trials=50) # 50 essais d'optimisation
21
22# Entraînement du modèle final
23best_params = study.best_params
24best_lr = best_params.pop('lr') # Extraction du taux d'apprentissage optimal
25
26# Construction du modèle final avec les meilleurs hyperparamètres
27final_model = build_advanced_model(
28 input_shape=(SEQ_LENGTH, X_train.shape[2]),
29 **best_params
30)
31
32# Compilation du modèle final
33final_model.compile(
34 optimizer=tf.keras.optimizers.Adam(learning_rate=best_lr),
35 loss=tf.keras.losses.Huber(),
36 metrics=['mae']
37)
38
39# Entraînement sur l'ensemble des données
40history = final_model.fit(
41 X_seq, y_seq,
42 epochs=200,
43 batch_size=128,
44 validation_split=0.2,
45 callbacks=[EarlyStopping(patience=20)] # Arrêt précoce
46)
47
48# Sauvegarde du modèle
49final_model.save('bitcoin_advanced_model.h5')
8. Post-traitement et Évaluation
Transformation Inverse
1# Récupération du scaler du pipeline
2scaler = pipeline.named_steps['scaler']
3n_features = data.shape[1] # Nombre de caractéristiques originales
4
5# Fonction de transformation inverse
6def inverse_scale(y_values):
7 dummy = np.zeros((len(y_values), n_features))
8 dummy[:, target_idx] = y_values.ravel()
9 return scaler.inverse_transform(dummy)[:, target_idx]
10
11# Application aux prédictions
12y_actual = inverse_scale(y_seq)
13y_pred = inverse_scale(final_model.predict(X_seq).flatten())
Nettoyage des Données
1# Vérification et suppression des NaN
2print("\nDiagnostic des données:")
3print(f"NaNs dans y_actual: {np.isnan(y_actual).sum()}")
4print(f"NaNs dans y_pred: {np.isnan(y_pred).sum()}")
5
6# Création du masque de filtrage
7mask = ~np.isnan(y_actual) & ~np.isnan(y_pred)
8
9# Filtrage des données
10y_actual_clean = y_actual[mask]
11y_pred_clean = y_pred[mask]
12dates_clean = data.index[SEQ_LENGTH:][mask]
13
14# Validation finale
15assert len(y_actual_clean) > 0, "Erreur critique: Aucune donnée valide après nettoyage!"
16print(f"Données valides conservées: {len(y_actual_clean)}/{len(y_actual)} points")
Visualisation des Résultats
1# Graphique comparatif
2plt.figure(figsize=(16, 8))
3plt.plot(dates_clean, y_actual_clean, label='Prix Réel', linewidth=2)
4plt.plot(dates_clean, y_pred_clean, label='Prédictions', linestyle='--', alpha=0.8)
5plt.title('Prédictions vs Réalité (Données Nettoyées)', fontsize=16)
6plt.xlabel('Date', fontsize=12)
7plt.ylabel('Prix (USD)', fontsize=12)
8plt.legend()
9plt.grid(True)
10plt.savefig('predictions_vs_reality.png') # Sauvegarde pour la documentation
11plt.show()
Calcul des Métriques de Performance
1from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
2
3# Calcul des métriques
4mae = mean_absolute_error(y_actual_clean, y_pred_clean)
5rmse = np.sqrt(mean_squared_error(y_actual_clean, y_pred_clean))
6
7# Affichage des résultats
8print("\nPerformance du Modèle:")
9print(f"MAE: {mae:.2f} USD")
10print(f"RMSE: {rmse:.2f} USD")
11print(f"Directionnal Accuracy : {DA:.2f} %")
12
13# Sauvegarde des résultats
14with open('model_performance.txt', 'w') as f:
15 f.write(f"MAE: {mae:.2f}\n")
16 f.write(f"RMSE: {rmse:.2f}\n")