
    j#                        d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZmZ d*d	Zd+dZ	 d,ddddd-dZd,d.dZd,d/dZd0d1dZd2dZ	 	 	 d3d4d#Zd5d6d&Zd7d8d)ZdS )9a4  SQLite-backed durable task queue.

This module replaces in-process ``threading.Thread`` dispatching for heavy
workloads such as training, inference, and face library rebuilds. Tasks survive
Web-process restarts and can be consumed by a dedicated ``worker.py`` process.

The queue is stored in the same SQLite database used by the rest of the app
(``SQLITE_DB_PATH``).

Producer example::

    from shared.task_queue import submit_task

    task_id = submit_task("train", payload={"dataset_id": "...", ...})

Consumer example::

    from shared.task_queue import claim_task, complete_task, fail_task

    row = claim_task("train")
    if row:
        try:
            do_work(row["payload"])
            complete_task(row["id"], result={...})
        except Exception as e:
            fail_task(row["id"], error=str(e))
    )annotationsN)Any)uuid4)SQLITE_DB_PATHloggerreturnsqlite3.Connectionc                     t           j                            t                    } | rt          j        | d           t          j        t          d          }t
          j        |_        |	                    d           |S )NT)exist_ok   )timeoutzPRAGMA journal_mode=WAL)
ospathdirnamer   makedirssqlite3connectRowrow_factoryexecute)parentconns     3/home/longshao/multi-rider-rag/shared/task_queue.py_connectr   -   sh    W__^,,F +
FT****?>2666D{DLL*+++K    r   Nonec                    |                      d           |                      d           |                      d           dS )zrCreate the ``task_queue`` table if it does not exist.

    Called once from :func:`shared.db.sqlite.init_db`.
    at  
        CREATE TABLE IF NOT EXISTS task_queue (
            id          TEXT PRIMARY KEY,
            task_type   TEXT NOT NULL,
            status      TEXT NOT NULL DEFAULT 'pending',
            payload     TEXT NOT NULL DEFAULT '{}',
            result      TEXT NOT NULL DEFAULT '{}',
            error       TEXT NOT NULL DEFAULT '',
            owner_key   TEXT NOT NULL DEFAULT '',
            owner_ip    TEXT NOT NULL DEFAULT '',
            created_ts  INTEGER NOT NULL DEFAULT 0,
            claimed_ts  INTEGER,
            finished_ts INTEGER,
            retries     INTEGER NOT NULL DEFAULT 0
        )
        zVCREATE INDEX IF NOT EXISTS idx_task_queue_status_type ON task_queue(status, task_type)zPCREATE INDEX IF NOT EXISTS idx_task_queue_created ON task_queue(created_ts DESC)N)r   )r   s    r   init_task_queue_tabler   ;   s_    
 	LL	  $ 	LL`   	LLZ    r    )	owner_keyowner_iptask_id	task_typestrpayloaddict[str, Any] | Noner    r!   r"   
str | Nonec          
     t   |pt                      j        }t          t          j                              }t	                      5 }|                    d|| t          j        |pi d          |||f           |                                 ddd           n# 1 swxY w Y   t          j
        d||            |S )z*Enqueue a new task and return its task id.z
            INSERT INTO task_queue (id, task_type, status, payload, owner_key, owner_ip, created_ts)
            VALUES (?, ?, 'pending', ?, ?, ?, ?)
            Fensure_asciiNztask submitted: %s type=%s)r   hexinttimer   r   jsondumpscommitr   info)r#   r%   r    r!   r"   nowr   s          r   submit_taskr3   ^   s     $G
dikk

C	 t
 
7=bu===	
 	
 	
 	                K,gyAAANs   ABBBc                Z   t          t          j                              }t                      5 }|                    d           	 | r*|                    d| f                                          }n'|                    d                                          }|"|                                 	 ddd           dS |                    d||d         f          }|j        dk    r"|                                 	 ddd           dS |                                 n## t          $ r |                                  w xY w	 ddd           n# 1 swxY w Y   	 t          j
        |d                   }n# t          $ r i }Y nw xY w|d         |d	         ||d
         |d         |d         dS )a  Atomically claim the oldest pending task.

    When *task_type* is provided, only tasks of that type are considered. The
    ``BEGIN IMMEDIATE`` transaction prevents multiple worker processes from
    selecting the same pending row before it is marked as running.
    zBEGIN IMMEDIATEz
                    SELECT * FROM task_queue
                    WHERE status='pending' AND task_type=?
                    ORDER BY created_ts ASC, id ASC
                    LIMIT 1
                    z
                    SELECT * FROM task_queue
                    WHERE status='pending'
                    ORDER BY created_ts ASC, id ASC
                    LIMIT 1
                    NzTUPDATE task_queue SET status='running', claimed_ts=? WHERE id=? AND status='pending'id   r%   r#   r    r!   
created_ts)r5   r#   r%   r    r!   r7   )r,   r-   r   r   fetchoner0   rowcountrollback	Exceptionr.   loads)r#   r2   r   rowcurr%   s         r   
claim_taskr?      sQ    dikk

C	 %t&'''#	 ll L  (**  ll  (**  {3% % % % % % % %6 ,,fc$i  C |q  C% % % % % % % %D KKMMMM 	 	 	MMOOO	 E% % % % % % % % % % % % % % %N*S^,,    $i%%
O,'  sH   EA)D==DDE D==EEEE4 4FFresultc                B   t          t          j                              }t                      5 }|                    dt	          j        |pi d          || f           |                                 ddd           n# 1 swxY w Y   t          j        d|            dS )z<Mark *task_id* as completed with an optional result payload.zLUPDATE task_queue SET status='completed', result=?, finished_ts=? WHERE id=?Fr)   Nztask completed: %s)	r,   r-   r   r   r.   r/   r0   r   r1   )r"   r@   r2   r   s       r   complete_taskrB      s    
dikk

C	 tZZ"59993H	
 	
 	
 	               K$g.....s   AA??BBerrorc                (   t          t          j                              }t                      5 }|                    d||| f           |                                 ddd           n# 1 swxY w Y   t          j        d| |dd                    dS )z/Mark *task_id* as failed with an error message.zHUPDATE task_queue SET status='failed', error=?, finished_ts=? WHERE id=?Nztask failed: %s  error=%s   )r,   r-   r   r   r0   r   warning)r"   rC   r2   r   s       r   	fail_taskrG      s    
dikk

C	 tVC!	
 	
 	
 	               N.ttEEEEEs   .A))A-0A-c                X   t                      5 }|                    d| f                                          }ddd           n# 1 swxY w Y   |dS t          |          }dD ]D}	 t	          j        |                    |          pd          ||<   0# t          $ r i ||<   Y Aw xY w|S )z3Return the current state of *task_id*, or ``None``.z#SELECT * FROM task_queue WHERE id=?Nr%   r@   {})r   r   r8   dictr.   r<   getr;   )r"   r   r=   dkeys        r   get_taskrO      s
   	 Ytll@7*MMVVXXY Y Y Y Y Y Y Y Y Y Y Y Y Y Y
{tS		A$  	Zc

 2d33AcFF 	 	 	AcFFF	Hs#   *AA	A	(,BB'&B'2   statuslimitr,   list[dict[str, Any]]c                   g }g }| r*|                     d           |                     |            |r*|                     d           |                     |           |rd                    |          nd}|                     |           t                      5 }|                    d| d|                                          }ddd           n# 1 swxY w Y   g }|D ]m}	t          |	          }
dD ]D}	 t          j        |
                    |          pd	          |
|<   0# t          $ r i |
|<   Y Aw xY w|                     |
           n|S )
z3List tasks filtered by *task_type* and/or *status*.ztask_type=?zstatus=?z AND z1=1zSELECT * FROM task_queue WHERE z! ORDER BY created_ts DESC LIMIT ?NrI   rJ   )
appendjoinr   r   fetchallrK   r.   r<   rL   r;   )r#   rQ   rR   clausesparamswherer   rowsresultsr=   rM   rN   s               r   
list_tasksr]      s    GF !}%%%i    z"""f%,7GLL!!!%E
MM%	 t||VeVVV
 
 (** 	              
 G  II( 	 	CAEE#JJ$6$77#   #qNs$   -CCC7,D$$D65D6  max_age_secondsc                ,   t          t          j                              | z
  }t                      5 }|                    d|f          }|                                 ddd           n# 1 swxY w Y   |j        }|rt          j        d||           |S )zGReset stale running tasks back to pending and return the changed count.zcUPDATE task_queue SET status='pending', retries=retries+1 WHERE status='running' AND claimed_ts < ?Nz(reset %d stale running tasks (cutoff=%d))r,   r-   r   r   r0   r9   r   r1   )r_   cutoffr   r>   counts        r   reset_stale_runningrc     s    /F	 tllqI
 
 	               LE O>vNNNLs   ,A**A.1A.r   daysc                    t          t          j                              | dz  z
  }t                      5 }|                    d|f          }|                                 ddd           n# 1 swxY w Y   |j        S )z0Delete completed/failed tasks older than *days*.iQ zRDELETE FROM task_queue WHERE status IN ('completed', 'failed') AND finished_ts < ?N)r,   r-   r   r   r0   r9   )rd   ra   r   r>   s       r   cleanup_old_tasksrf     s    u,F	 tll`I
 
 	               <s   ,A--A14A1)r   r	   )r   r	   r   r   )N)r#   r$   r%   r&   r    r$   r!   r$   r"   r'   r   r$   )r#   r'   r   r&   )r"   r$   r@   r&   r   r   )r   )r"   r$   rC   r$   r   r   )r"   r$   r   r&   )NNrP   )r#   r'   rQ   r'   rR   r,   r   rS   )r^   )r_   r,   r   r,   )r   )rd   r,   r   r,   )__doc__
__future__r   r.   r   r   r-   typingr   uuidr   shared.config.configr   r   r   r   r3   r?   rB   rG   rO   r]   rc   rf    r   r   <module>rm      s   8 # " " " " "  				               7 7 7 7 7 7 7 7      J &*      F; ; ; ; ;|	/ 	/ 	/ 	/ 	/	F 	F 	F 	F 	F     !    B    	 	 	 	 	 	 	r   