
    2h                       d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ d	Zdd
ZefddZddZdddd dZ edefddddiddd      	 d!ddddddddddddd	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d"d       Zy)#    )annotations)shared_task)SoftTimeLimitExceeded)cache)transaction)process_manual_queue)clear_cache_task)backfill_main_imagestask_store_main_imagei  c           
     h    d| xs d d|xs dj                          dt        t        |             S )Nzmanual_pipeline_lock:ALL:*)lowerintbool)	manual_idnameforces      ,/var/www/extractly/manual_agregator/tasks.py	_lock_keyr      s=    "9#5"6a8K8K8M7NaPSTXY^T_P`Oabb    c                0    t        j                  | d|      S )N1)r   add)keyttls     r   _acquire_lockr      s    99S#s##r   c                .    t        j                  |        y N)r   delete)r   s    r   _release_lockr"      s    	LLr     F)backfill_limit	overwritec                B      fd}t        j                  |       y)z
    Schedule image tasks after the surrounding DB transaction commits.
    If no transaction is active, on_commit runs immediately.
    Add a small countdown to let replicas/caches catch up.
    c                     d} r.t        j                  t              t              dd|        y t	        j                  dt              id|        y )N   )ad_idr%   images)kwargsqueue	countdownlimit)r   apply_asyncr   r   r
   )	delay_secr$   only_idr%   s    r   _enqueuez._enqueue_images_after_commit.<locals>._enqueue%   sR    	!--!$WDOL# !,,^!45#r   N)r   	on_commit)r1   r$   r%   r2   s   ``` r   _enqueue_images_after_commitr4      s    $ (#r   T<   max_retries   i  manual)bindautoretry_forretry_backoffretry_jitterretry_kwargs	acks_latesoft_time_limitr,   Nbatch   moder.   r   r1   dry_runr   r   force_namesclear_cache_after
clear_keysclear_patterns	clear_allc               X   |dk(  r|rd}t        |||      }d}d}	 |dk(  r+t        |      }|sd|||d|dk(  r|r|st        |       S S S S t        t	        |      ||t        |      t        |      |t        |	xs g             }d|t	        |      t	        |      |||d}|dk(  rK|dkD  rFt        j                  |t               | j                  d||d	|d||	xs g |
|||d
d       d}d|d<   nRd|d<   |
r;|xs g }|xs ddg}t        j                  t        ||t        |      d      d       |st        |dd       ||dk(  r|r|st        |       S S S S # t        $ r' |dk(  r d|||dcY |dk(  r|r|st        |       S S S S  w xY w# |dk(  r|r|st        |       w w w w xY w)z
    Manual processing pipeline:
    - optional single-id mode (only_id) or batched 'limit'
    - uses a cache lock in 'pipeline' mode to keep a single runner
    - schedules image tasks strictly AFTER COMMIT via transaction.on_commit
    pipelineFskipped_locked)statusrC   r   r   )r.   name_filtersr1   rD   r   r   rE   ok)rM   rC   	processedr.   r   r1   r   r   NrB   r8   )r+   r,   Trequeuedzmanual_pipeline_lock:*zimg_upload_lock:*z	[manual] )keyspatternsrI   
log_prefixdefaultr#   )r1   r$   r%   soft_timeout)r   r   r"   r   r   r   setr   touchLOCK_TTL_SECr/   r	   dictr4   r   )selfprevious_task_resultrC   r.   r   r1   rD   r   r   rE   rF   rG   rH   rI   lock_keyacquiredrQ   rP   resulteff_keyseff_patternss                        r   task_process_manualrb   :   sQ   B zeD%0HHH]$:$X.H. !* 	p :(8(# 4<(a )e*Mu+[.B0
	 YZ"
 :)a-KK,/&" #&"!*#.#4"):",&4!*   " H!%F:!&F: !&," . a3KM`2a ,,%!-"&y/#.	 $ ,##'#  :(8(# 4<( ! :(&	  :(8(# 4<( 	 :(8(# 4<(s*   E C9E F2F FF F))returnstr)r   rd   r   r   rc   r   )r   rd   )r1   
int | Noner$   r   r%   r   r    )rC   rd   r.   r   r   z
str | Noner1   re   rD   r   r   r   r   re   rE   list[str] | NonerF   r   rG   rf   rH   rf   rI   r   )
__future__r   celeryr   celery.exceptionsr   django.core.cacher   	django.dbr   manual_agregator.run_parserr   abstractclass.tasksr	   image_agregator.tasksr
   r   rY   r   r   r"   r4   	Exceptionrb    r   r   <module>rq      s9   "  3 # ! < 0
 c (4 $ PTgl $6 	,#
	 |$  $(##''+|$ 	|$
 |$ |$ |$ |$ |$ |$ "|$ |$ !|$ %|$ |$	|$r   