From b52729630a7255ce121abcf99c5cc8eb66fa2e23 Mon Sep 17 00:00:00 2001 From: cangui Date: Sun, 28 Sep 2025 15:44:15 +0200 Subject: [PATCH] fix mutli dowload --- internal/controllers/downloadController.go | 178 ++++++++++++++------- 1 file changed, 119 insertions(+), 59 deletions(-) diff --git a/internal/controllers/downloadController.go b/internal/controllers/downloadController.go index d4e3848..7351d03 100644 --- a/internal/controllers/downloadController.go +++ b/internal/controllers/downloadController.go @@ -25,7 +25,27 @@ var seriesRegex = regexp.MustCompile(`^(.+?)\.S\d{2}E\d{2}`) var ( jobs = make(map[string]*runner.DownloadJob) jobsMu sync.Mutex + // --- Ajouts --- + inFlightMu sync.Mutex + inFlight = map[string]struct{}{} // set des jobs déjà en cours (par id) + + parallelism = 4 // <— ajuste le nombre de DL en parallèle + sem = make(chan struct{}, parallelism) ) +func tryMarkInFlight(id string) bool { + inFlightMu.Lock() + defer inFlightMu.Unlock() + if _, ok := inFlight[id]; ok { + return false + } + inFlight[id] = struct{}{} + return true +} +func unmarkInFlight(id string) { + inFlightMu.Lock() + delete(inFlight, id) + inFlightMu.Unlock() +} func HandleAddJobsMultiple(db *gorm.DB) gin.HandlerFunc { return func(c *gin.Context) { // 1. Parsing des champs du formulaire @@ -167,43 +187,62 @@ func HandleAddJobsMultiple(db *gorm.DB) gin.HandlerFunc { func HandleStartJob(db *gorm.DB) gin.HandlerFunc { - return func(c *gin.Context) { - id := c.Param("id") - log.Printf("[StartJob] ID = %s", id) + return func(c *gin.Context) { + id := c.Param("id") + log.Printf("[StartJob] ID = %s", id) - // 1. Vérifie si le job est déjà en mémoire - jobsMu.Lock() - job, exists := jobs[id] - jobsMu.Unlock() + // Empêche un job déjà en cours d'être relancé + if !tryMarkInFlight(id) { + log.Printf("[StartJob] Job %s déjà en cours, on ignore", id) + c.Status(http.StatusNoContent) + return + } - // 2. Sinon, récupère depuis la BDD - if !exists { - var j runner.DownloadJob - if err := db.First(&j, "id = ?", id).Error; err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": "Job introuvable"}) - return - } - jobCopy := j - jobsMu.Lock() - jobs[id] = &jobCopy - job = &jobCopy - jobsMu.Unlock() - } + // Récupération du job depuis la mémoire ou la DB + jobsMu.Lock() + job, exists := jobs[id] + jobsMu.Unlock() + if !exists { + var j runner.DownloadJob + if err := db.First(&j, "id = ?", id).Error; err != nil { + unmarkInFlight(id) + c.JSON(http.StatusNotFound, gin.H{"error": "Job introuvable"}) + return + } + jobCopy := j + jobsMu.Lock() + jobs[id] = &jobCopy + job = &jobCopy + jobsMu.Unlock() + } - clt:= client.NewClient(db) - account := runner.GetFirstActiveAccount(clt) - if account == nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Aucun compte Debrid-Link actif"}) - return - } - clt.SetAccount(account) + // Init client Debrid-Link + clt := client.NewClient(db) + account := runner.GetFirstActiveAccount(clt) + if account == nil { + unmarkInFlight(id) + c.JSON(http.StatusBadRequest, gin.H{"error": "Aucun compte Debrid-Link actif"}) + return + } + clt.SetAccount(account) - go runner.StartDownload(job, job.Link, clt, db) - runner.Broadcast() + // Lancement asynchrone avec limite de parallélisme + go func(job *runner.DownloadJob) { + sem <- struct{}{} // bloque si trop de jobs en cours + defer func() { + <-sem + unmarkInFlight(job.ID) + runner.Broadcast() + }() - c.Status(http.StatusNoContent) - } + runner.StartDownload(job, job.Link, clt, db) + }(job) + + runner.Broadcast() + c.Status(http.StatusNoContent) + } } + func HandlePauseJob() gin.HandlerFunc { return func(c *gin.Context) { id := c.Param("id") @@ -214,39 +253,60 @@ func HandlePauseJob() gin.HandlerFunc { } } func HandleResumeJob(db *gorm.DB) gin.HandlerFunc { - return func(c *gin.Context) { - id := c.Param("id") + return func(c *gin.Context) { + id := c.Param("id") + log.Printf("[ResumeJob] ID = %s", id) - jobsMu.Lock() - job, exists := jobs[id] - jobsMu.Unlock() + // Empêche un job déjà en cours d'être relancé + if !tryMarkInFlight(id) { + log.Printf("[ResumeJob] Job %s déjà en cours, on ignore", id) + c.Status(http.StatusNoContent) + return + } - if !exists { - var j runner.DownloadJob - if err := db.First(&j, "id = ?", id).Error; err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": "Job introuvable"}) - return - } - jobCopy := j - jobsMu.Lock() - jobs[id] = &jobCopy - job = &jobCopy - jobsMu.Unlock() - } + // Récupération du job depuis la mémoire ou la DB + jobsMu.Lock() + job, exists := jobs[id] + jobsMu.Unlock() + if !exists { + var j runner.DownloadJob + if err := db.First(&j, "id = ?", id).Error; err != nil { + unmarkInFlight(id) + c.JSON(http.StatusNotFound, gin.H{"error": "Job introuvable"}) + return + } + jobCopy := j + jobsMu.Lock() + jobs[id] = &jobCopy + job = &jobCopy + jobsMu.Unlock() + } - clt := client.NewClient(db) - account := runner.GetFirstActiveAccount(clt) - if account == nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Aucun compte actif"}) - return - } - clt.SetAccount(account) + // Init client Debrid-Link + clt := client.NewClient(db) + account := runner.GetFirstActiveAccount(clt) + if account == nil { + unmarkInFlight(id) + c.JSON(http.StatusBadRequest, gin.H{"error": "Aucun compte Debrid-Link actif"}) + return + } + clt.SetAccount(account) - go runner.StartDownload(job, job.Link, clt, db) - runner.Broadcast() + // Reprise asynchrone avec limite de parallélisme + go func(job *runner.DownloadJob) { + sem <- struct{}{} // bloque si trop de jobs en cours + defer func() { + <-sem + unmarkInFlight(job.ID) + runner.Broadcast() + }() - c.Status(http.StatusNoContent) - } + runner.StartDownload(job, job.Link, clt, db) + }(job) + + runner.Broadcast() + c.Status(http.StatusNoContent) + } } func HandleDeleteJob(db *gorm.DB) gin.HandlerFunc { return func(c *gin.Context) {