
    ,iD              
         d dl mZ d dlZd dlZd dlmZ d dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlmZmZmZ d dl d dlmZmZmZ  ej>                  e       Z! e"edd      Z# e"edd      Z$ e"edd      Z% e"edd      Z& e"edd      Z'dZ(d5dZ)e(fd6dZ*d7dZ+d7dZ,	 d dl-m.Z.m/Z/ dZ0e0r  e.d!d"g       Z2 e/d#d$d%g      Z3 e.d&d'g       Z4ndxZ2xZ3Z4d8d(Z5d9d:d)Z6 ede1fd*dd+didd,d-.      d/e$e'd0	 	 	 	 	 d;d1       Z7 edd*dd+didd2d-3      d/e8e9d0	 	 	 	 	 d;d4       Z:y# e1$ r d Z0Y w xY w)<    )annotationsN)sleep)shared_task)SoftTimeLimitExceeded)settings)cache)AdsManualCloudSerializer)get_secret_key)_mark_sent_by_urls_mark_inactive_sent)_mark_by_response_207)_count_pending_pending_qs_take_batch)*)_inactive_lock_key_normalize_limittake_inactive_batchADS_CLOUD_URLz5https://www.hously.cloud/space/receive-advertisement/ADS_SEND_BATCH_SIZE   ADS_SEND_RETRY_LIMIT   ADS_SEND_BASE_DELAYg      ?ADS_SEND_TIMEOUT   i  c                      y)Nsend_ads_to_cloud_lock r       '/var/www/extractly/houslyspace/tasks.py_send_lock_keyr"   )   s    #r    c                0    t        j                  | d|      S )N1)r   add)keyttls     r!   _acquire_lockr(   .   s    99S#s##r    c                8    t        j                  | t               y N)r   touchLOCK_TTL_SECr&   s    r!   _refresh_lockr.   2   s    	KK\"r    c                .    t        j                  |        y r*   )r   deleter-   s    r!   _release_lockr1   6   s    	LLr    )GaugeCounterTFads_pending_sendzPending ads to sendads_sent_totalzTotal ads sent to cloudresultads_last_send_tszLast send ads timestampc                ,    | | j                  |       y y r*   )set)gvalues     r!   	_prom_setr<   J   s    }	e r    c                J    | !| j                  |      j                  |       y y r*   )labelsinc)clabelr;   s      r!   	_prom_incrB   O   s"    }	E" r    <   max_retriessend_to_cloudz20/m)bindautoretry_forretry_backoffretry_jitterretry_kwargs	acks_latequeue
rate_limitpipelinemodelimittimeoutc                  t               }d}	 |dk(  r%t        |      }|sddi|dk(  r|rt        |       S S S t               }d| dd}t	               }t
        rt        t        t        |             |dk(  rJt
        r+t        t        t        t        j                                      d	dd
|dk(  r|rt        |       S S S d}	t        |      }
|
sJt
        r+t        t        t        t        j                                      d|d
|dk(  r|rt        |       S S S t        |
d      j                  }|D cg c]  }|j                  d      s| }}|D ch c]  }|d   	 }}|sddi|dk(  r|rt        |       S S S d}|t        k  rv|dz  }	 t!        j"                  t$        d|i||      }	 t&        j)                  d|j*                  |j,                  dd        |j*                  dv rEt1        |       t3        |      }	t
        rt5        t6        d|	       t&        j9                  d|	       n|j*                  dk(  r	 |j;                         }t=        ||      }	|	t3        |      k(  r.t
        rt5        t6        d|	       t&        j9                  d|	       n_|	dkD  r;t
        rt5        t6        d|	       t&        j?                  d|	t3        |      |	z
         nt&        j?                  dt3        |             n |j*                  dv r|j,                  xs djA                         }d |v sd!|v rDt1        |       t3        |      }	t
        rt5        t6        d"|	       t&        j9                  d#|	       ntB        d$|dz
  z  z  }t&        j?                  d%|j*                  |t        |       tE        |       nAtB        d$|dz
  z  z  }t&        j?                  d%|j*                  |t        |       tE        |       |t        k  rvt&        jI                  d't               d|tK        |	      tK        |      tK        |      d(}|dk(  r4t	               dkD  r'tM        |       | jO                  d||d)*       d|d+<   nd|d+<   t
        r+t        t        t        t        j                                      ||dk(  r|rt        |       S S S c c}w c c}w # t.        $ r Y w xY w# t.        $ r i }Y w xY w# t         jF                  $ rB}tB        d$|dz
  z  z  }t&        j?                  d&|t        ||       tE        |       Y d}~Id}~ww xY w# tP        $ r! |dk(  rdd,icY |dk(  r|rt        |       S S S  w xY w# |dk(  r|rt        |       w w w xY w)-u   
    Wysyłka ogłoszeń (upsert po URL).
    - "batch": jedna paczka i koniec
    - "pipeline": powtarza się dopóki są pendingi (z lockiem)
    FrN   statusskipped_lockedBearer application/jsonAuthorizationzContent-Typer   empty)rT   pendingempty_batchT)manyurlno_urls_in_batch   datajsonheadersrR   z[cloud] HTTP %s, body: %sNi         okz[cloud] batch OK (%s items).   z-[cloud] batch 207 -> all marked success (%s).partialz,[cloud] batch PARTIAL: success=%s, failed=%sz+[cloud] batch PARTIAL: success=0, failed=%s)i  i   zalready exists	duplicate
duplicatesz0[cloud] batch DUPLICATES -> marked as sent (%s).   u6   [cloud] HTTP %s on attempt %s/%s. Retrying in %.1fs…u@   [cloud] NETWORK error on attempt %s/%s: %s. Retrying in %.1fs…z&[cloud] batch FAILED after %s retries.)rT   rP   	processedrQ   pending_beforerO   kwargsrequeuedsoft_timeout))r"   r(   r1   r
   r   _PROMr<   G_PENDING_SENDfloatG_LAST_SEND_TStimer   r	   ra   getSEND_RETRY_LIMITrequestspost	CLOUD_URLloggerdebugstatus_codetext	Exceptionr   lenrB   C_SENT_TOTALinforc   r   warninglowerSEND_BASE_DELAYr   RequestExceptionerrorintr.   apply_asyncr   )selfrP   rQ   rR   lock_keyacquired
secret_keyrd   rp   processed_totalbatch
serializedit	page_urlsattemptresppayloadlowdelayer6   s                        r!   task_send_ads_to_cloudr   Y   sq   , HHS$:$X.H "23\ :((# #+Y $%
&-j\$:L^_')neN&;<Q.%		*<=%!4B :((# #+ E".%		*<=+Gt :((# #+q .e$?DD
#-?:Rb:
?)342RY	401f :((# #+c ((qLGP}}Yfj5ISZdklLL!<d>N>NPTPYPYZ_[_P`a ##z1&y1&))nO!,oFKK >P%%,%"&))+ '<GY&OO&#i.8 %lD/J$SUde(1, %lIOJ+	N_< 'TVYZcVde%%399?113C'3.+2D*95*-i. %lL/R$VXgh+qWq[/ABENNP((( %L ,qWq[/ABENNP((( %LO ((h LLACST _-Z!.1
 :."2Q"6(#Z%T[$\]!%F:!&F:neDIIK&89 :((# #+m @4 !  % %"$%f ,, 	'11+=>V$ e	D ! :n-- :((# #+ 	
 :((# #+s  U A5U AU U 6SSU S#U ?U S/ 1.S AS/ 0U 2S/ S B'S/ 9U ;A7S/ 2U 3BS/ 6	U B-U 
U 	SS/ SS/ S,(S/ +S,,S/ /U7T?9U ?UU U1U4 /U11U4 4V
inactive)rF   rH   rI   rJ   rK   rL   rM   c               
   t               }d}t        |      }	 |dk(  r%t        |      }|sddi|dk(  r|rt        |       S S S t	               }d| dd}t        |      }|sddi|dk(  r|rt        |       S S S |D 	
cg c]  \  }	}
}|

 }}
}	}|D 
ch c]  }
|
j                  d	      s|
d	    }}
d
}d
}|t        k  r|dz  }	 t        j                  t        d|i||      }	 t        j                  d|j                  |j                  dd        |j                  dv r.t!        |       t#        |      }t        j%                  d|       nu|j                  dk(  ri }	 |j'                         }|j                  d      xs g }t/               }t/               }|D ]  }t1        |t2              s|j                  d	      }|j                  d      }|s:|j                  d      xs i }t1        |t2              rlg }|j5                         D ]E  }t1        |t6              r|j9                  d |D               ,|j;                  t=        |             G dj?                  |      }nt=        |      }|dv r|jA                  |       |dk(  rd|v r|jA                  |       |jA                  |        |D 
cg c]  \  }}
}|
j                  d	      |v s||
|f! } }
}}| rt!        |        t#        |       }t        j)                  dt#        |       t#        |             nt*        d|dz
  z  z  }t        j)                  d|j                  |t        |       t-        |       	 |t        k  r|D ]I  \  }}!}	 tE        |d!d      }"|"3|"jF                  xs d
dz   |"_#        d"|"_$        |"jK                  d#d$g%       K d&tM        |      |tM        |      d'}#|dk(  r,|d
kD  r'tO        |       | jQ                  d||d()       d*|#d+<   nd|#d+<   |#|dk(  r|rt        |       S S S c c}}
}	w c c}
w # t        $ r Y .w xY w# t        $ rW}t        j)                  d|       t*        d|dz
  z  z  }t        j)                  d||t               t-        |       Y d}~d}~ww xY wc c}}
}w # t        jB                  $ rB}t*        d|dz
  z  z  }t        j)                  d ||t        |       t-        |       Y d}~d}~ww xY w# t        $ r Y w xY w# tR        $ r! |dk(  rdd,icY |dk(  r|rt        |       S S S  w xY w# |dk(  r|rt        |       w w w xY w)-z
    Sends inactive ads info (mark-as-inactive) to cloud.

    - mode == "batch": single batch and exit
    - mode == "pipeline": keeps re-queuing itself while there is work to do
      (with a distributed lock, so only one pipeline runs at a time).
    FrN   rT   rU   rV   rW   rX   rZ   r^   r   r`   ra   rb   z[inactive] HTTP %s, body: %sNi   re   z[inactive] batch OK (%s items).ri   z5[inactive] Failed to parse JSON from 207 response: %srn   zE[inactive] HTTP 207 with invalid JSON; retry in %.1fs (attempt %s/%s)resultserrorsc              3  2   K   | ]  }t        |        y wr*   )str).0xs     r!   	<genexpr>z2task_send_inactive_ads_to_cloud.<locals>.<genexpr>  s     0CAQs    >   createdupdatedrl   r   zObject not foundz)[inactive] PARTIAL: success=%s, failed=%sz0[inactive] HTTP %s attempt %s/%s; retry in %.1fsz:[inactive] NETWORK error %s; attempt %s/%s; retry in %.1fsinactive_synczFAILED after retriesattempts
last_error)update_fieldsrh   )rT   ro   rP   rQ   rO   rq   Trs   rt   )*r   r   r(   r1   r
   r   rz   INACTIVE_RETRY_LIMITr|   r}   INACTIVE_CLOUD_URLr   r   r   r   r   r   r   r   rc   r   INACTIVE_BASE_DELAYr   r9   
isinstancedictvalueslistextendappendr   joinr%   r   getattrr   r   saver   r.   r   r   )$r   rP   rQ   rR   r   r   r   rd   bunch_adp_hpayload_listurlsr   ro   r   r   r   r   r   failed	succeededitemustr   partsverrors_textadhsucceeded_bunch_psyncr6   s$                                       r!   task_send_inactive_ads_to_cloudr     s   . "#HHU#Eu$:$X.H "23^ :((# #+[ $%
&-j\$:L^_ $E*g&N :((# #+K /44elsAre4".?,Q!%%,%,?	,,qLGz}}& ,/##	LL6((		%4( ##z1'. #L 1IKK A9M %%, G!"&))+" &kk)4:G UF #I ')$5$ HHUO!XXh/ $!%(!3!9r &fd3$&E%+]]_#-a#6$)LL0C0C$C$)LLQ$8	 &5
 +.((5/K*-f+K
 !DD%MM!,7]/A[/P%MM!,"JJqM? !(D EJ&gEjr1aQUUSX\]fMfAqzEO&g&+O< #O 4INNCO,F  0113EFENNJ((, %Le ,,B $
B"2=D')-);!q(@*@		\0J	K $ !s9~tVYZ_V`a:)a-(#Z%T[$\]!%F:!&F: :((# #+K 5?. !   % !S
 !4qWq[7I Jc!#0	 e !n 'h6 ,, 	+qWq[/ABP( e	( ! " ! :n-- :((# #+ 	 :((# #+s+  T "T :T  P-T P4,P43T 	R3 ).P9 :R3 T R3 &Q	 6ER3 R, R,'AR3 ,T -AR3 .
T :	T ATAT -T 9	QR3 QR3 		R)AR$R3 "T $R))
R3 3T7T=T TT 	TT TT U-U UU U)returnr   )r&   r   r'   r   r   bool)r&   r   )r;   rw   )r`   )rA   r   r;   r   )rP   r   rQ   r   rR   r   );
__future__r   loggingry   r   r|   celeryr   celery.exceptionsr   django.confr   django.core.cacher   extractly.serializersr	   houslyspace.utils.get_secretr
   houslyspace.utils.mark_inactiver   r   houslyspace.utils.code_207r   houslyspace.utils.pendingr   r   r   houslyspace.utils.qs_inactiver   r   r   	getLogger__name__r   r   r~   SEND_BATCH_SIZEr{   r   SEND_TIMEOUTr,   r"   r(   r.   r1   prometheus_clientr2   r3   ru   r   rv   r   rx   r<   rB   r   INACTIVE_BATCH_SIZEINACTIVE_TIMEOUTr   r   r    r!   <module>r      s    "      3   # : 7 T < N N ,  
		8	$ Ho/fg	($93?8%;Q? ($93?x!3R8$
 (4 $#
0E 	-/DbIN+-F
SL-/H"MN599N9\N
# 	,#
	  b$ b$ 	b$
 b$	b$X 	#
 $#G$ G$ 	G$
 G$G$s  Es   =
D? ?E	E	