Prétraitement

1. Chargement des Données

 1import pandas as pd
 2
 3# Chargement du dataset Bitcoin complet
 4data = pd.read_csv('/content/drive/MyDrive/timeSerie/Final_Bitcoin_dataset.csv', parse_dates=['Date'])
 5
 6# Conversion des dates et mise en place comme index
 7data.set_index('Date', inplace=True)
 8
 9print(f"Dataset chargé avec {data.shape[0]} observations et {data.shape[1]} caractéristiques")
10print(data.head())

2. Feature Engineering

Calcul des indicateurs techniques

1# Volatilité intra-journée
2data['Volatility'] = (data['High'] - data['Low']) / data['Low'] * 100
3
4# Moving Average Convergence Divergence (MACD)
5data['MACD'] = data['Close'].ewm(span=12).mean() - data['Close'].ewm(span=26).mean()
6
7# Création de décalages temporels pour l'indice Fear & Greed
8for lag in [1, 3, 7, 15, 30, 60]:
9    data[f'fear_greed_lag_{lag}'] = data['fear_greed'].shift(lag)

Analyse de corrélation des décalages

1for lag in [1, 3, 7, 15, 30, 60]:
2    correlation = data[['Close', f'fear_greed_lag_{lag}']].corr().iloc[0,1]
3    print(f"Corrélation avec un décalage de {lag} jours: {correlation:.4f}")

3. Analyse Exploratoire

Analyse Univariée

 1import matplotlib.pyplot as plt
 2import seaborn as sns
 3
 4# Distribution des prix de clôture
 5plt.figure(figsize=(12, 6))
 6sns.histplot(data['Close'], kde=True)
 7plt.title('Distribution des Prix de Clôture Bitcoin')
 8plt.xlabel('Prix (USD)')
 9plt.ylabel('Fréquence')
10plt.show()
11
12# Analyse des séries temporelles
13fig, ax = plt.subplots(3, 1, figsize=(12, 10))
14data['Close'].plot(ax=ax[0], title='Prix de Clôture Bitcoin')
15data['Volatility'].plot(ax=ax[1], title='Volatilité Intra-journée')
16data['MACD'].plot(ax=ax[2], title='Indicateur MACD')
17plt.tight_layout()

Analyse Bivariée

 1# Relation entre prix et volume
 2plt.figure(figsize=(10, 6))
 3sns.scatterplot(x='Volume', y='Close', data=data)
 4plt.title('Relation Prix/Volume')
 5plt.show()
 6
 7# Matrice de corrélation
 8corr_matrix = data.corr()
 9plt.figure(figsize=(14, 10))
10sns.heatmap(corr_matrix, annot=True, fmt=".2f", cmap='coolwarm')
11plt.title('Matrice de Corrélation')
12plt.show()

Tests Statistiques (KPSS, ACF, PACF)

 1from statsmodels.tsa.stattools import adfuller, kpss
 2from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
 3
 4# Test ADF pour la stationnarité
 5adf_result = adfuller(data['Close'])
 6print(f'ADF Statistic: {adf_result[0]}')
 7print(f'p-value: {adf_result[1]}')
 8
 9# Test KPSS
10kpss_result = kpss(data['Close'])
11print(f'KPSS Statistic: {kpss_result[0]}')
12print(f'p-value: {kpss_result[1]}')
13
14# ACF et PACF
15fig, ax = plt.subplots(2, 1, figsize=(12, 8))
16plot_acf(data['Close'], lags=30, ax=ax[0])
17plot_pacf(data['Close'], lags=30, ax=ax[1])
18plt.tight_layout()

4. Pipeline de Prétraitement

Classe de Création de Séquences

 1import numpy as np
 2from sklearn.base import BaseEstimator, TransformerMixin
 3
 4class SequenceTransformer(BaseEstimator, TransformerMixin):
 5    """Transforme les données en séquences temporelles pour les modèles RNN"""
 6
 7    def __init__(self, seq_length=60, target_idx=0):
 8        self.seq_length = seq_length
 9        self.target_idx = target_idx
10
11    def fit(self, X, y=None):
12        return self
13
14    def transform(self, X):
15        X_seq, y_seq = [], []
16        for i in range(len(X) - self.seq_length):
17            X_seq.append(X[i:i+self.seq_length])
18            y_seq.append(X[i+self.seq_length, self.target_idx])
19        return np.array(X_seq), np.array(y_seq)

Pipeline Complet

1from sklearn.preprocessing import RobustScaler
2from sklearn.pipeline import Pipeline
3
4def create_pipeline(target_idx=0, seq_length=60):
5    """Crée le pipeline de prétraitement complet"""
6    return Pipeline([
7        ('scaler', RobustScaler()),  # Normalisation robuste aux outliers
8        ('sequencer', SequenceTransformer(seq_length=seq_length, target_idx=target_idx))
9    ])

5. Architecture du Modèle

 1from tensorflow.keras.models import Sequential
 2from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Conv1D, MaxPooling1D, Bidirectional
 3
 4def build_advanced_model(input_shape, model_type='LSTM', units=64, n_layers=2, dropout=0.3):
 5    """Construit une architecture de modèle avancée pour séries temporelles"""
 6    model = Sequential()
 7
 8    # Couche d'entrée
 9    if model_type == 'CNN-LSTM':
10        model.add(Conv1D(filters=units, kernel_size=3, activation='relu',
11                        padding='causal', input_shape=input_shape))
12        model.add(MaxPooling1D(2))
13        model.add(LSTM(units//2, return_sequences=(n_layers>1)))
14    elif model_type == 'BiLSTM':
15        model.add(Bidirectional(LSTM(units, return_sequences=(n_layers>1)),
16                              input_shape=input_shape))
17    else:  # LSTM standard
18        model.add(LSTM(units, return_sequences=(n_layers>1), input_shape=input_shape))
19
20    # Couches cachées
21    for i in range(n_layers - 1):
22        model.add(LSTM(units, return_sequences=(i < n_layers-2)))
23        model.add(Dropout(dropout))
24
25    # Couches de sortie
26    model.add(Dense(64, activation='relu'))
27    model.add(Dense(1))  # Sortie pour la prédiction de prix
28
29    return model

6. Optimisation des Hyperparamètres avec Optuna

 1import optuna
 2import tensorflow as tf
 3
 4def objective(trial):
 5    """Fonction d'objectif pour l'optimisation Optuna"""
 6    # Espace de recherche des hyperparamètres
 7    params = {
 8        'units': trial.suggest_int('units', 32, 256),
 9        'n_layers': trial.suggest_int('n_layers', 1, 4),
10        'dropout': trial.suggest_float('dropout', 0.1, 0.5),
11        'model_type': trial.suggest_categorical('model_type', ['LSTM', 'BiLSTM', 'CNN-LSTM']),
12        'lr': trial.suggest_float('lr', 1e-5, 1e-3, log=True)
13    }
14
15    lr = params.pop('lr')  # Taux d'apprentissage séparé
16
17    # Construction du modèle
18    model = build_advanced_model(
19        input_shape=(SEQ_LENGTH, X_train.shape[2]),
20        **params
21    )
22
23    # Compilation
24    model.compile(
25        optimizer=tf.keras.optimizers.Adam(lr),
26        loss=tf.keras.losses.Huber(),  # Perte robuste aux outliers
27        metrics=['mae']  # Erreur Moyenne Absolue
28    )
29
30    # Callbacks pour améliorer la formation
31    callbacks = [
32        EarlyStopping(patience=15, restore_best_weights=True),
33        ReduceLROnPlateau(factor=0.5, patience=5)  # Réduction LR sur plateau
34    ]
35
36    # Entraînement
37    history = model.fit(
38        X_train, y_train,
39        validation_data=(X_val, y_val),
40        epochs=100,
41        batch_size=64,
42        verbose=0,
43        callbacks=callbacks
44    )
45
46    return min(history.history['val_mae'])  # Minimiser la MAE de validation

7. Workflow Principal

 1# Paramètres globaux
 2SEQ_LENGTH = 60  # Longueur des séquences temporelles (2 mois)
 3TARGET_COL = 'Close'  # Variable cible
 4
 5# Création du pipeline
 6target_idx = data.columns.get_loc(TARGET_COL)
 7pipeline = create_pipeline(target_idx=target_idx, seq_length=SEQ_LENGTH)
 8
 9# Transformation des données
10X_seq, y_seq = pipeline.fit_transform(data.values)
11
12# Split temporel avec TimeSeriesSplit
13tscv = TimeSeriesSplit(n_splits=5)
14train_idx, val_idx = next(tscv.split(X_seq))
15X_train, X_val = X_seq[train_idx], X_seq[val_idx]
16y_train, y_val = y_seq[train_idx], y_seq[val_idx]
17
18# Optimisation Optuna
19study = optuna.create_study(direction='minimize')
20study.optimize(objective, n_trials=50)  # 50 essais d'optimisation
21
22# Entraînement du modèle final
23best_params = study.best_params
24best_lr = best_params.pop('lr')  # Extraction du taux d'apprentissage optimal
25
26# Construction du modèle final avec les meilleurs hyperparamètres
27final_model = build_advanced_model(
28    input_shape=(SEQ_LENGTH, X_train.shape[2]),
29    **best_params
30)
31
32# Compilation du modèle final
33final_model.compile(
34    optimizer=tf.keras.optimizers.Adam(learning_rate=best_lr),
35    loss=tf.keras.losses.Huber(),
36    metrics=['mae']
37)
38
39# Entraînement sur l'ensemble des données
40history = final_model.fit(
41    X_seq, y_seq,
42    epochs=200,
43    batch_size=128,
44    validation_split=0.2,
45    callbacks=[EarlyStopping(patience=20)]  # Arrêt précoce
46)
47
48# Sauvegarde du modèle
49final_model.save('bitcoin_advanced_model.h5')

8. Post-traitement et Évaluation

Transformation Inverse

 1# Récupération du scaler du pipeline
 2scaler = pipeline.named_steps['scaler']
 3n_features = data.shape[1]  # Nombre de caractéristiques originales
 4
 5# Fonction de transformation inverse
 6def inverse_scale(y_values):
 7    dummy = np.zeros((len(y_values), n_features))
 8    dummy[:, target_idx] = y_values.ravel()
 9    return scaler.inverse_transform(dummy)[:, target_idx]
10
11# Application aux prédictions
12y_actual = inverse_scale(y_seq)
13y_pred = inverse_scale(final_model.predict(X_seq).flatten())

Nettoyage des Données

 1# Vérification et suppression des NaN
 2print("\nDiagnostic des données:")
 3print(f"NaNs dans y_actual: {np.isnan(y_actual).sum()}")
 4print(f"NaNs dans y_pred: {np.isnan(y_pred).sum()}")
 5
 6# Création du masque de filtrage
 7mask = ~np.isnan(y_actual) & ~np.isnan(y_pred)
 8
 9# Filtrage des données
10y_actual_clean = y_actual[mask]
11y_pred_clean = y_pred[mask]
12dates_clean = data.index[SEQ_LENGTH:][mask]
13
14# Validation finale
15assert len(y_actual_clean) > 0, "Erreur critique: Aucune donnée valide après nettoyage!"
16print(f"Données valides conservées: {len(y_actual_clean)}/{len(y_actual)} points")

Visualisation des Résultats

 1# Graphique comparatif
 2plt.figure(figsize=(16, 8))
 3plt.plot(dates_clean, y_actual_clean, label='Prix Réel', linewidth=2)
 4plt.plot(dates_clean, y_pred_clean, label='Prédictions', linestyle='--', alpha=0.8)
 5plt.title('Prédictions vs Réalité (Données Nettoyées)', fontsize=16)
 6plt.xlabel('Date', fontsize=12)
 7plt.ylabel('Prix (USD)', fontsize=12)
 8plt.legend()
 9plt.grid(True)
10plt.savefig('predictions_vs_reality.png')  # Sauvegarde pour la documentation
11plt.show()

Calcul des Métriques de Performance

 1from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
 2
 3# Calcul des métriques
 4mae = mean_absolute_error(y_actual_clean, y_pred_clean)
 5rmse = np.sqrt(mean_squared_error(y_actual_clean, y_pred_clean))
 6
 7# Affichage des résultats
 8print("\nPerformance du Modèle:")
 9print(f"MAE: {mae:.2f} USD")
10print(f"RMSE: {rmse:.2f} USD")
11print(f"Directionnal Accuracy : {DA:.2f} %")
12
13# Sauvegarde des résultats
14with open('model_performance.txt', 'w') as f:
15    f.write(f"MAE: {mae:.2f}\n")
16    f.write(f"RMSE: {rmse:.2f}\n")