# manual_agregator/tasks.py
from __future__ import annotations

import logging
import time
from time import sleep

import requests
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
from django.core.cache import cache
from extractly.serializers import AdsManualCloudSerializer
from houslyspace.utils.get_secret import get_secret_key

# helpers z Twoich utilsów
from houslyspace.utils.mark_inactive import _mark_sent_by_urls, _mark_inactive_sent
from houslyspace.utils.code_207 import _mark_by_response_207
from houslyspace.utils.pending import _count_pending, _pending_qs, _take_batch

# INACTIVE utils (✅ poprawiona nazwa modułu)
from houslyspace.utils.qs_inactive import *
from houslyspace.utils.qs_inactive import (
    _inactive_lock_key,
    _normalize_limit,
    take_inactive_batch,
)

logger = logging.getLogger(__name__)

# ───────────────────────────────── config (możesz przenieść do settings.py) ─────────────────────────────────
CLOUD_URL = getattr(settings, "ADS_CLOUD_URL", "https://www.hously.cloud/space/receive-advertisement/")
SEND_BATCH_SIZE = getattr(settings, "ADS_SEND_BATCH_SIZE", 150)
SEND_RETRY_LIMIT = getattr(settings, "ADS_SEND_RETRY_LIMIT", 3)
SEND_BASE_DELAY = getattr(settings, "ADS_SEND_BASE_DELAY", 0.5)
SEND_TIMEOUT = getattr(settings, "ADS_SEND_TIMEOUT", 20)

LOCK_TTL_SEC = 15 * 60


def _send_lock_key() -> str:
    # Globalny lock dla pipeline wysyłki (chcesz per-source → rozszerz klucz)
    return "send_ads_to_cloud_lock"


def _acquire_lock(key: str, ttl: int = LOCK_TTL_SEC) -> bool:
    return cache.add(key, "1", ttl)


def _refresh_lock(key: str):
    cache.touch(key, LOCK_TTL_SEC)


def _release_lock(key: str):
    cache.delete(key)


# ───────── Prometheus (opcjonalnie) ─────────
try:
    from prometheus_client import Gauge, Counter  # type: ignore

    _PROM = True
except Exception:
    _PROM = False

if _PROM:
    G_PENDING_SEND = Gauge("ads_pending_send", "Pending ads to send", [])
    C_SENT_TOTAL = Counter("ads_sent_total", "Total ads sent to cloud", ["result"])
    G_LAST_SEND_TS = Gauge("ads_last_send_ts", "Last send ads timestamp", [])
else:
    G_PENDING_SEND = C_SENT_TOTAL = G_LAST_SEND_TS = None


def _prom_set(g, value: float):
    if g is not None:
        g.set(value)


def _prom_inc(c, label: str, value: int = 1):
    if c is not None:
        c.labels(label).inc(value)






# ───────────────────────────────────── sender: ACTIVE ADS ─────────────────────────────────────
@shared_task(
    bind=True,
    autoretry_for=(Exception,),
    retry_backoff=60,
    retry_jitter=True,
    retry_kwargs={"max_retries": 3},
    acks_late=True,
    queue="send_to_cloud",
    rate_limit="20/m",
)
def task_send_ads_to_cloud(
    self,
    *,
    mode: str = "pipeline",       # "batch" | "pipeline"
    limit: int = SEND_BATCH_SIZE, # rozmiar paczki
    timeout: int = SEND_TIMEOUT,
):
    """
    Wysyłka ogłoszeń (upsert po URL).
    - "batch": jedna paczka i koniec
    - "pipeline": powtarza się dopóki są pendingi (z lockiem)
    """
    lock_key = _send_lock_key()
    acquired = False

    try:
        if mode == "pipeline":
            acquired = _acquire_lock(lock_key)
            if not acquired:
                return {"status": "skipped_locked"}

        secret_key = get_secret_key()
        # jeśli backend akceptuje też "goły" sekret, możesz wrócić do samego secret_key
        headers = {"Authorization": f"Bearer {secret_key}", "Content-Type": "application/json"}

        pending_before = _count_pending()
        if _PROM:
            _prom_set(G_PENDING_SEND, float(pending_before))

        if pending_before == 0:
            if _PROM:
                _prom_set(G_LAST_SEND_TS, float(time.time()))
            return {"status": "empty", "pending": 0}

        processed_total = 0
        batch = _take_batch(limit)
        if not batch:
            if _PROM:
                _prom_set(G_LAST_SEND_TS, float(time.time()))
            return {"status": "empty_batch", "pending": pending_before}

        serialized = AdsManualCloudSerializer(batch, many=True).data
        # odfiltruj rekordy bez URL
        serialized = [it for it in serialized if it.get("url")]
        page_urls = {it["url"] for it in serialized}
        if not serialized:
            return {"status": "no_urls_in_batch"}

        attempt = 0
        while attempt < SEND_RETRY_LIMIT:
            attempt += 1
            try:
                resp = requests.post(CLOUD_URL, json={"data": serialized}, headers=headers, timeout=timeout)

                # diagnostyka (przytnij, aby nie zalewać logów)
                try:
                    logger.debug("[cloud] HTTP %s, body: %s", resp.status_code, resp.text[:1024])
                except Exception:
                    pass

                if resp.status_code in (200, 201):
                    _mark_sent_by_urls(page_urls)
                    processed_total = len(page_urls)
                    if _PROM:
                        _prom_inc(C_SENT_TOTAL, "ok", processed_total)
                    logger.info("[cloud] batch OK (%s items).", processed_total)
                    break

                elif resp.status_code == 207:
                    try:
                        payload = resp.json()
                    except Exception:
                        payload = {}

                    processed_total = _mark_by_response_207(payload, page_urls)
                    if processed_total == len(page_urls):
                        if _PROM:
                            _prom_inc(C_SENT_TOTAL, "ok", processed_total)
                        logger.info("[cloud] batch 207 -> all marked success (%s).", processed_total)
                    elif processed_total > 0:
                        if _PROM:
                            _prom_inc(C_SENT_TOTAL, "partial", processed_total)
                        logger.warning(
                            "[cloud] batch PARTIAL: success=%s, failed=%s",
                            processed_total,
                            len(page_urls) - processed_total,
                        )
                    else:
                        logger.warning("[cloud] batch PARTIAL: success=0, failed=%s", len(page_urls))
                    break

                elif resp.status_code in (400, 409):
                    low = (resp.text or "").lower()
                    if "already exists" in low or "duplicate" in low:
                        _mark_sent_by_urls(page_urls)
                        processed_total = len(page_urls)
                        if _PROM:
                            _prom_inc(C_SENT_TOTAL, "duplicates", processed_total)
                        logger.info("[cloud] batch DUPLICATES -> marked as sent (%s).", processed_total)
                        break
                    # retry
                    delay = SEND_BASE_DELAY * (2 ** (attempt - 1))
                    logger.warning(
                        "[cloud] HTTP %s on attempt %s/%s. Retrying in %.1fs…",
                        resp.status_code,
                        attempt,
                        SEND_RETRY_LIMIT,
                        delay,
                    )
                    sleep(delay)

                else:
                    delay = SEND_BASE_DELAY * (2 ** (attempt - 1))
                    logger.warning(
                        "[cloud] HTTP %s on attempt %s/%s. Retrying in %.1fs…",
                        resp.status_code,
                        attempt,
                        SEND_RETRY_LIMIT,
                        delay,
                    )
                    sleep(delay)

            except requests.RequestException as e:
                delay = SEND_BASE_DELAY * (2 ** (attempt - 1))
                logger.warning(
                    "[cloud] NETWORK error on attempt %s/%s: %s. Retrying in %.1fs…",
                    attempt,
                    SEND_RETRY_LIMIT,
                    e,
                    delay,
                )
                sleep(delay)
        else:
            logger.error("[cloud] batch FAILED after %s retries.", SEND_RETRY_LIMIT)

        result = {
            "status": "ok",
            "mode": mode,
            "processed": int(processed_total),
            "limit": int(limit),
            "pending_before": int(pending_before),
        }

        # 👉 nie gaś, jeśli nic nie przeszło; jedź dalej, dopóki są pendingi
        if mode == "pipeline" and _count_pending() > 0:
            _refresh_lock(lock_key)
            self.apply_async(kwargs={"mode": "pipeline", "limit": limit, "timeout": timeout})
            result["requeued"] = True
        else:
            result["requeued"] = False

        if _PROM:
            _prom_set(G_LAST_SEND_TS, float(time.time()))

        return result

    except SoftTimeLimitExceeded:
        if mode == "pipeline":
            return {"status": "soft_timeout"}
        raise
    finally:
        if mode == "pipeline" and acquired:
            _release_lock(lock_key)






# ───────────────────────────────────── sender: INACTIVE ADS ─────────────────────────────────────


@shared_task(
    bind=True,
    retry_backoff=60,
    retry_jitter=True,
    retry_kwargs={"max_retries": 3},
    acks_late=True,
    queue="inactive",
    rate_limit="20/m",
)
def task_send_inactive_ads_to_cloud(
    self,
    *,
    mode: str = "pipeline",  # "batch" | "pipeline"
    limit: int = INACTIVE_BATCH_SIZE,
    timeout: int = INACTIVE_TIMEOUT,
):
    """
    Sends inactive ads info (mark-as-inactive) to cloud.

    - mode == "batch": single batch and exit
    - mode == "pipeline": keeps re-queuing itself while there is work to do
      (with a distributed lock, so only one pipeline runs at a time).
    """
    lock_key = _inactive_lock_key()
    acquired = False
    limit = _normalize_limit(limit)

    try:
        # Acquire lock for pipeline mode
        if mode == "pipeline":
            acquired = _acquire_lock(lock_key)
            if not acquired:
                return {"status": "skipped_locked"}

        secret_key = get_secret_key()
        headers = {"Authorization": f"Bearer {secret_key}", "Content-Type": "application/json"}

        # Take batch of inactive ads to send
        bunch = take_inactive_batch(limit)
        if not bunch:
            return {"status": "empty"}

        payload_list = [p for (_ad, p, _h) in bunch]
        urls = {p["url"] for p in payload_list if p.get("url")}

        attempt = 0
        processed = 0

        while attempt < INACTIVE_RETRY_LIMIT:
            attempt += 1

            try:
                resp = requests.post(
                    INACTIVE_CLOUD_URL,
                    json={"data": payload_list},
                    headers=headers,
                    timeout=timeout,
                )

                # Debug body for diagnostics (truncated)
                try:
                    logger.debug(
                        "[inactive] HTTP %s, body: %s",
                        resp.status_code,
                        resp.text[:2048],
                    )
                except Exception:
                    # Do not crash task on logging failure
                    pass

                # ───────── 200 / 201: all OK ─────────
                if resp.status_code in (200, 201):
                    _mark_inactive_sent(bunch)
                    processed = len(payload_list)
                    logger.info("[inactive] batch OK (%s items).", processed)
                    break

                # ───────── 207: partial success / errors ─────────
                elif resp.status_code == 207:
                    payload = {}
                    try:
                        payload = resp.json()
                    except Exception as e:
                        logger.warning(
                            "[inactive] Failed to parse JSON from 207 response: %s",
                            e,
                        )
                        # If we can't parse JSON, we can't know what happened; treat as failure and retry
                        delay = INACTIVE_BASE_DELAY * (2 ** (attempt - 1))
                        logger.warning(
                            "[inactive] HTTP 207 with invalid JSON; retry in %.1fs (attempt %s/%s)",
                            delay,
                            attempt,
                            INACTIVE_RETRY_LIMIT,
                        )
                        sleep(delay)
                        continue

                    results = payload.get("results") or []

                    failed = set()
                    succeeded = set()

                    for item in results:
                        if not isinstance(item, dict):
                            continue

                        u = item.get("url")
                        st = item.get("status")
                        if not u:
                            continue

                        errors = item.get("errors") or {}

                        # Flatten errors into a single text string for easier matching
                        if isinstance(errors, dict):
                            parts = []
                            for v in errors.values():
                                if isinstance(v, list):
                                    parts.extend(str(x) for x in v)
                                else:
                                    parts.append(str(v))
                            errors_text = " ".join(parts)
                        else:
                            errors_text = str(errors)

                        # Treat these as success:
                        # - "updated" / "created" / "duplicate"
                        # - "error" with "Object not found" -> pragmatically considered "done"
                        if st in {"updated", "created", "duplicate"}:
                            succeeded.add(u)
                        elif st == "error" and "Object not found" in errors_text:
                            succeeded.add(u)
                        else:
                            failed.add(u)

                    # Map back to the original `bunch` (ad, payload, helper)
                    succeeded_bunch = [(ad, p, h) for (ad, p, h) in bunch if p.get("url") in succeeded]

                    if succeeded_bunch:
                        _mark_inactive_sent(succeeded_bunch)

                    processed = len(succeeded_bunch)
                    logger.warning(
                        "[inactive] PARTIAL: success=%s, failed=%s",
                        len(succeeded_bunch),
                        len(failed),
                    )

                    # We don't retry the same batch again in this run – break out of retry loop
                    break

                # ───────── other HTTP errors ─────────
                else:
                    delay = INACTIVE_BASE_DELAY * (2 ** (attempt - 1))
                    logger.warning(
                        "[inactive] HTTP %s attempt %s/%s; retry in %.1fs",
                        resp.status_code,
                        attempt,
                        INACTIVE_RETRY_LIMIT,
                        delay,
                    )
                    sleep(delay)

            except requests.RequestException as e:
                delay = INACTIVE_BASE_DELAY * (2 ** (attempt - 1))
                logger.warning(
                    "[inactive] NETWORK error %s; attempt %s/%s; retry in %.1fs",
                    e,
                    attempt,
                    INACTIVE_RETRY_LIMIT,
                    delay,
                )
                sleep(delay)

        else:
            # Final failure after all retries – mark attempts / last_error if such relation exists
            for ad, _p, _h in bunch:
                try:
                    sync = getattr(ad, "inactive_sync", None)
                    if sync is not None:
                        sync.attempts = (sync.attempts or 0) + 1
                        sync.last_error = "FAILED after retries"
                        sync.save(update_fields=["attempts", "last_error"])
                except Exception:
                    # Best-effort only
                    pass

        # ───────── result + requeue logic ─────────
        result = {"status": "ok", "processed": int(processed), "mode": mode, "limit": int(limit)}

        if mode == "pipeline" and processed > 0:
            # There was progress: keep pipeline alive while there is still work
            _refresh_lock(lock_key)
            self.apply_async(kwargs={"mode": "pipeline", "limit": limit, "timeout": timeout})
            result["requeued"] = True
        else:
            result["requeued"] = False

        return result

    except SoftTimeLimitExceeded:
        if mode == "pipeline":
            return {"status": "soft_timeout"}
        raise

    finally:
        if mode == "pipeline" and acquired:
            _release_lock(lock_key)
