shelfy-v2/internal/job/runner.go
2025-08-18 22:01:17 +02:00

592 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package runner
import (
"archive/zip"
"canguidev/shelfy/internal/client"
"canguidev/shelfy/internal/models"
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"gorm.io/gorm"
)
type DownloadJob struct {
ID string `gorm:"primaryKey;column:id"`
Link string `gorm:"column:link"`
Name string `gorm:"column:name"`
Status string `gorm:"column:status"`
PathID int `gorm:"column:path_id"` // 👈 int pas uint
Size int64 `gorm:"column:size"`
Host string `gorm:"column:host"`
Progress int `gorm:"column:progress"` // 👈 int
StreamURL string `gorm:"column:stream_url"`
Speed int `gorm:"column:speed;default:0"` // vitesse en Ko/s
CreatedAt time.Time `gorm:"autoCreateTime"`
UpdatedAt time.Time `gorm:"autoUpdateTime"`
}
var (
jobs = make(map[string]*DownloadJob)
jobsMu sync.Mutex
)
func GetFirstActiveAccount(client *client.Client) *client.DebridAccount {
ctx := context.Background() // ✅ on remplace ici
accounts, err := client.ListDebridAccounts(ctx)
if err != nil {
log.Println("[ERROR] Impossible de récupérer les comptes :", err)
return nil
}
for _, acc := range accounts {
if acc.IsActive {
return &acc
}
}
return nil
}
// Enregistre un job en mémoire et en base
func RegisterJobWithDB(job *DownloadJob, db *gorm.DB) error {
var existing DownloadJob
// On cherche le job existant SANS les soft deletes si jamais ils sont activés par erreur
err := db.Unscoped().First(&existing, "id = ?", job.ID).Error
if err == nil {
// Le job existe déjà, on le met à jour
log.Printf("[INFO] Mise à jour du job existant : %s\n", job.ID)
err = db.Model(&existing).Updates(map[string]interface{}{
"link": job.Link,
"name": job.Name,
"status": job.Status,
"path_id": job.PathID,
"size": job.Size,
"host": job.Host,
"progress": job.Progress,
"stream_url": job.StreamURL,
"updated_at": time.Now(),
}).Error
if err != nil {
log.Printf("[ERROR] Échec de la mise à jour : %v\n", err)
return err
}
} else if errors.Is(err, gorm.ErrRecordNotFound) {
// Le job n'existe pas, on le crée
if err := db.Create(job).Error; err != nil {
log.Printf("[ERROR] Insertion échouée : %v\n", err)
return err
}
log.Printf("[INFO] Nouveau job enregistré : %s\n", job.ID)
} else {
// Une erreur inattendue
log.Printf("[ERROR] Erreur inattendue lors de la recherche du job : %v\n", err)
return err
}
// Mise à jour en mémoire
jobsMu.Lock()
jobs[job.ID] = job
jobsMu.Unlock()
return nil
}
// Charge tous les jobs depuis la base en mémoire (au démarrage)
func InitJobsFromDB(db *gorm.DB) error {
var jobList []DownloadJob
if err := db.Find(&jobList).Error; err != nil {
return err
}
jobsMu.Lock()
defer jobsMu.Unlock()
for _, j := range jobList {
jobCopy := j
jobs[j.ID] = &jobCopy
}
log.Printf("[JOB] %d jobs rechargés depuis la base\n", len(jobs))
return nil
}
// Met à jour le status dun job et le persiste
func UpdateJobStatus(id string, status string, db *gorm.DB) {
jobsMu.Lock()
defer jobsMu.Unlock()
if job, ok := jobs[id]; ok {
job.Status = status
job.UpdatedAt = time.Now()
if db != nil {
_ = db.Save(job)
}
}
Broadcast()
}
// Met à jour la progression dun job et le persiste
func UpdateJobProgress(id string, progress int, db *gorm.DB) {
jobsMu.Lock()
defer jobsMu.Unlock()
if job, ok := jobs[id]; ok {
job.Progress = progress
job.UpdatedAt = time.Now()
if db != nil {
_ = db.Save(job)
}
}
}
// Supprime un job (mémoire uniquement)
func DeleteJob(id string, db *gorm.DB) error {
// Supprime en mémoire
jobsMu.Lock()
delete(jobs, id)
jobsMu.Unlock()
// Supprime en base
if err := db.Delete(&DownloadJob{}, "id = ?", id).Error; err != nil {
log.Printf("[ERROR] Échec de suppression du job en base : %v\n", err)
return err
}
log.Printf("[JOB] Supprimé : %s\n", id)
return nil
}
// Liste tous les jobs
func ListJobs(db *gorm.DB) []*DownloadJob {
var jobsFromDB []*DownloadJob
if err := db.Order("created_at desc").Find(&jobsFromDB).Error; err != nil {
log.Printf("[ERROR] Impossible de charger les jobs depuis la base : %v\n", err)
return []*DownloadJob{}
}
return jobsFromDB
}
func StartDownload(job *DownloadJob, downloadURL string, client *client.Client, db *gorm.DB) {
UpdateJobStatus(job.ID, "downloading", db)
// Récupère l'entrée (Path="Film" / "Série" / ...)
var p models.PathDownload
if err := db.First(&p, job.PathID).Error; err != nil {
UpdateJobStatus(job.ID, "failed", db)
return
}
// ★ Chemin physique = upload/<Path>
diskBase := filepath.Join("upload", p.Path)
if err := os.MkdirAll(diskBase, 0o755); err != nil {
log.Printf("[ERROR] Création dossier '%s' : %v", diskBase, err)
UpdateJobStatus(job.ID, "failed", db)
return
}
// HEAD pour taille + Accept-Ranges
resp, err := http.Head(downloadURL)
if err != nil || resp.StatusCode != http.StatusOK {
UpdateJobStatus(job.ID, "failed", db)
return
}
size := resp.ContentLength
if size <= 0 {
UpdateJobStatus(job.ID, "failed", db)
return
}
acceptRanges := strings.ToLower(resp.Header.Get("Accept-Ranges"))
if acceptRanges != "bytes" {
log.Println("[INFO] Serveur ne supporte pas Range, fallback single thread")
// ★ passer le chemin physique
StartDownloadSingleThread(job, downloadURL, db, diskBase)
return
}
const numSegments = 4
segmentSize := size / numSegments
tmpFiles := make([]string, numSegments)
wg := sync.WaitGroup{}
progressChan := make(chan int64, 100)
done := make(chan bool)
// Progression + Vitesse
var downloaded int64
go func() {
var lastTotal int64 = 0
lastUpdate := time.Now()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case n := <-progressChan:
downloaded += n
case <-ticker.C:
elapsed := time.Since(lastUpdate).Seconds()
if elapsed > 0 {
speed := int(float64(downloaded-lastTotal) / elapsed / 1024) // Ko/s
lastTotal = downloaded
lastUpdate = time.Now()
progress := int((downloaded * 100) / size)
db.Model(&DownloadJob{}).Where("id = ?", job.ID).Updates(map[string]interface{}{
"progress": progress,
"speed": speed,
})
Broadcast()
}
case <-done:
return
}
}
}()
// Téléchargement parallèle
for i := 0; i < numSegments; i++ {
start := int64(i) * segmentSize
end := start + segmentSize - 1
if i == numSegments-1 {
end = size - 1
}
tmpPath := filepath.Join(os.TempDir(), fmt.Sprintf("%v.part%d", job.ID, i))
tmpFiles[i] = tmpPath
wg.Add(1)
go func(start, end int64, tmpPath string) {
defer wg.Done()
if err := downloadSegment(downloadURL, start, end, tmpPath, progressChan); err != nil {
log.Printf("[ERROR] Segment %d-%d échoué : %v\n", start, end, err)
}
}(start, end, tmpPath)
}
wg.Wait()
close(done)
// Fusion vers destination finale
safeName := SanitizeFileName(job.Name)
// ★ utilise le dossier physique
finalPath := generateUniqueFilePath(diskBase, safeName)
out, err := os.Create(finalPath)
if err != nil {
UpdateJobStatus(job.ID, "failed", db)
return
}
defer out.Close()
for _, tmpPath := range tmpFiles {
part, err := os.Open(tmpPath)
if err != nil {
UpdateJobStatus(job.ID, "failed", db)
return
}
_, _ = io.Copy(out, part)
_ = part.Close()
_ = os.Remove(tmpPath)
}
// Post-traitement selon extension
ext := strings.ToLower(filepath.Ext(finalPath))
videoExts := map[string]bool{".mkv": true, ".avi": true, ".mp4": true}
if !videoExts[ext] {
switch ext {
case ".zip":
// ★ extraire dans le dossier physique
if err := unzip(finalPath, diskBase); err != nil {
log.Printf("[ERROR] Décompression ZIP échouée : %v\n", err)
UpdateJobStatus(job.ID, "failed", db)
return
}
case ".rar":
if err := unrarExtract(finalPath, diskBase); err != nil {
log.Printf("[ERROR] Décompression RAR échouée : %v\n", err)
UpdateJobStatus(job.ID, "failed", db)
return
}
default:
log.Printf("[INFO] Extension non gérée : %s\n", ext)
}
}
UpdateJobProgress(job.ID, 100, db)
UpdateJobStatus(job.ID, "done", db)
log.Printf("[OK] Fichier téléchargé : %s\n", finalPath)
}
// generateUniqueFilePath ajoute un suffixe si le fichier existe déjà
func generateUniqueFilePath(basePath, fileName string) string {
finalPath := filepath.Join(basePath, fileName)
if _, err := os.Stat(finalPath); os.IsNotExist(err) {
return finalPath
}
base := strings.TrimSuffix(fileName, filepath.Ext(fileName))
ext := filepath.Ext(fileName)
counter := 1
for {
newName := fmt.Sprintf("%s (%d)%s", base, counter, ext)
newPath := filepath.Join(basePath, newName)
if _, err := os.Stat(newPath); os.IsNotExist(err) {
return newPath
}
counter++
}
}
func StartDownloadSingleThread(job *DownloadJob, downloadURL string, db *gorm.DB, basePath string) {
UpdateJobStatus(job.ID, "running", db)
// --- Normalise le chemin disque ---
// Si basePath est "Film" (logique BDD), on le préfixe par "upload/".
// Si on te passe déjà "upload/Film" ou un chemin absolu, on le garde.
diskBase := basePath
clean := filepath.Clean(basePath)
if !filepath.IsAbs(clean) &&
clean != "upload" &&
!strings.HasPrefix(clean, "upload"+string(os.PathSeparator)) {
diskBase = filepath.Join("upload", clean)
}
// Créer le répertoire si nécessaire
if err := os.MkdirAll(diskBase, 0o755); err != nil {
log.Printf("[ERROR] Création du dossier %s échouée : %v\n", diskBase, err)
UpdateJobStatus(job.ID, "failed", db)
return
}
// Lance le téléchargement
resp, err := http.Get(downloadURL)
if err != nil {
log.Printf("[ERROR] Téléchargement échoué : %v\n", err)
UpdateJobStatus(job.ID, "failed", db)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("[ERROR] Erreur HTTP : %s\n", resp.Status)
UpdateJobStatus(job.ID, "failed", db)
return
}
destPath := filepath.Join(diskBase, SanitizeFileName(job.Name))
outFile, err := os.Create(destPath)
if err != nil {
log.Printf("[ERROR] Impossible de créer le fichier : %v\n", err)
UpdateJobStatus(job.ID, "failed", db)
return
}
defer outFile.Close()
// Taille totale pour progression
totalSize := resp.ContentLength
if totalSize <= 0 && job.Size > 0 {
totalSize = job.Size
}
buf := make([]byte, 32*1024) // 32KB
var downloaded int64
lastUpdate := time.Now()
for {
n, readErr := resp.Body.Read(buf)
if n > 0 {
if _, writeErr := outFile.Write(buf[:n]); writeErr != nil {
log.Printf("[ERROR] Écriture échouée : %v\n", writeErr)
UpdateJobStatus(job.ID, "failed", db)
return
}
downloaded += int64(n)
}
if readErr != nil {
if readErr == io.EOF {
break
}
log.Printf("[ERROR] Erreur de lecture : %v\n", readErr)
UpdateJobStatus(job.ID, "failed", db)
return
}
// Mise à jour de la progression (toutes les 500ms)
if totalSize > 0 && time.Since(lastUpdate) > 500*time.Millisecond {
progress := int((downloaded * 100) / totalSize)
UpdateJobProgress(job.ID, progress, db)
lastUpdate = time.Now()
}
}
UpdateJobProgress(job.ID, 100, db)
UpdateJobStatus(job.ID, "done", db)
log.Printf("[OK] Fichier téléchargé (single) : %s\n", destPath)
}
func downloadSegment(url string, start, end int64, dest string, progressChan chan<- int64) error {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
out, err := os.Create(dest)
if err != nil {
return err
}
defer out.Close()
buf := make([]byte, 32*1024)
for {
n, err := resp.Body.Read(buf)
if n > 0 {
if _, err := out.Write(buf[:n]); err != nil {
return err
}
progressChan <- int64(n) // ← envoie progression
}
if err != nil {
if err == io.EOF {
break
}
return err
}
}
return nil
}
func SanitizeFileName(name string) string {
re := regexp.MustCompile(`[^\w\-.]`)
return re.ReplaceAllString(name, "_")
}
var (
subscribers = make(map[chan struct{}]struct{})
subscribersMu sync.Mutex
)
// Subscribe renvoie un chan à fermer par le client SSE
func Subscribe() chan struct{} {
ch := make(chan struct{}, 1)
subscribersMu.Lock()
subscribers[ch] = struct{}{}
subscribersMu.Unlock()
return ch
}
// Unsubscribe supprime le chan
func Unsubscribe(ch chan struct{}) {
subscribersMu.Lock()
delete(subscribers, ch)
subscribersMu.Unlock()
close(ch)
}
// Broadcast notifie tous les subscribers
func Broadcast() {
subscribersMu.Lock()
defer subscribersMu.Unlock()
for ch := range subscribers {
select {
case ch <- struct{}{}:
log.Println("Broadcast envoyé à un client")
default:
log.Println("Client bloqué, message ignoré")
}
}
}
func unzip(srcZip, destDir string) error {
r, err := zip.OpenReader(srcZip)
if err != nil {
return err
}
defer r.Close()
for _, f := range r.File {
fpath := filepath.Join(destDir, f.Name)
if f.FileInfo().IsDir() {
os.MkdirAll(fpath, os.ModePerm)
continue
}
if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
return err
}
in, err := f.Open()
if err != nil {
return err
}
defer in.Close()
out, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer out.Close()
if _, err := io.Copy(out, in); err != nil {
return err
}
}
return nil
}
func unrarExtract(srcRar, destDir string) error {
log.Printf("[DEBUG] Début de lextraction RAR src: %q, dest: %q", srcRar, destDir)
// 1) Tentative avec unrar
cmdUnrar := exec.Command("unrar", "x", "-y", srcRar, destDir)
log.Printf("[DEBUG] Exécution de la commande unrar : %s", strings.Join(cmdUnrar.Args, " "))
outputUnrar, errUnrar := cmdUnrar.CombinedOutput()
log.Printf("[DEBUG] Résultat unrar err: %v, output:\n%s", errUnrar, string(outputUnrar))
if errUnrar == nil {
log.Printf("[INFO] Extraction réussie avec unrar.")
return nil
}
log.Printf("[WARN] Échec de unrar, passage à 7z.")
// 2) Repli sur 7z
cmd7z := exec.Command("7z", "x", srcRar, "-y", "-o"+destDir)
log.Printf("[DEBUG] Exécution de la commande 7z : %s", strings.Join(cmd7z.Args, " "))
output7z, err7z := cmd7z.CombinedOutput()
log.Printf("[DEBUG] Résultat 7z err: %v, output:\n%s", err7z, string(output7z))
if err7z == nil {
log.Printf("[INFO] Extraction réussie avec 7z.")
return nil
}
// 3) Les deux ont échoué
errMsg := fmt.Errorf(
"unrar a échoué : %v\nSortie unrar : %s\n\n7z a échoué : %v\nSortie 7z : %s",
errUnrar, string(outputUnrar),
err7z, string(output7z),
)
log.Printf("[ERROR] %v", errMsg)
return errMsg
}