# vim:set fileencoding=utf-8 et ts=4 sts=4 sw=4:
#
#   apt-listchanges - Show changelog entries between the installed versions
#                     of a set of packages and the versions contained in
#                     corresponding .deb files
#
#   Copyright (C) 2000-2006  Matt Zimmerman  <mdz@debian.org>
#   Copyright (C) 2006       Pierre Habouzit <madcoder@debian.org>
#   Copyright (C) 2016       Robert Luberda  <robert@debian.org>
#
#   This program is free software; you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation; either version 2 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program; if not, write to the Free Software
#   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
#

from __future__ import annotations
from typing import Any, cast

import copy
import datetime
import hashlib
import os
import pickle
import time

from apt_listchanges.ALCLog import debug
from apt_listchanges.ALChacks import _
from apt_listchanges.DebianFiles import ChangelogEntry


class DbError(Exception):
    pass


class DbStore:
    __slots__ = ('db_version', 'similar', 'exact', 'packages')

    def __init__(self) -> None:
        self.db_version: int = 2
        self.similar: dict[str, dict[bytes, int]] = {}
        self.exact: dict[str, dict[bytes, int]] = {}
        self.packages: dict[str, int] = {}

    def __add__(self, other: DbStore) -> DbStore:
        if self.__class__ != other.__class__:
            raise ValueError("Can only add DbStore to another DbStore")

        added = DbStore()

        added.similar = copy.deepcopy(self.similar)
        added.exact = copy.deepcopy(self.exact)
        added.packages = copy.deepcopy(self.packages)

        for path, checksums in other.similar.items():
            added.similar.setdefault(path, {}).update(checksums)
        for path, checksums in other.exact.items():
            added.exact.setdefault(path, {}).update(checksums)
        added.packages.update(other.packages)

        return added

    def __iadd__(self, other: DbStore) -> DbStore:
        if self.__class__ != other.__class__:
            raise ValueError("Can only add DbStore to another DbStore")

        for path, checksums in other.similar.items():
            self.similar.setdefault(path, {}).update(checksums)
        for path, checksums in other.exact.items():
            self.exact.setdefault(path, {}).update(checksums)
        self.packages.update(other.packages)

        return self

    @property
    def empty(self):
        return not (self.similar or self.exact or self.packages)

    def expire_old(self) -> None:
        # We keep entries for three years because Debian releases come out
        # approximately annually and are supported for approximately two years,
        # so the odds are extremely good that a new package version is released
        # in the interim. But it's OK if that doesn't happen for all packages
        # since we fall back on re-parsing the changelog on disk when
        # necessary, which will "re-up" things for another three years.
        three_years_ago = int(time.time() - 365.25 * 3)
        self._expire_dict(self.packages, three_years_ago)
        # Don't modify while iterating
        for path, checksums in list(self.similar.items()):
            self._expire_dict(checksums, three_years_ago)
            if not self.similar[path]:
                del self.similar[path]
        for path, checksums in list(self.exact.items()):
            self._expire_dict(checksums, three_years_ago)
            if not self.exact[path]:
                del self.exact[path]

    @staticmethod
    def _expire_dict(dct: dict[Any, int], cutoff: int) -> None:
        # Don't modify while iterating
        for key in list(dct.keys()):
            if dct[key] < cutoff:
                del dct[key]


class SeenDb:
    __slots__ = ('path', '_changes', '_d')

    '''Tracker for seen changelog entries, by content and by path
    Maintained in memory during program execution, potentially persisted to
    disk with pickle between invocations.
    '''

    def __init__(self, path: str | None = None) -> None:
        '''Initialize seen DB, possibly from disk
        If path is None or does not exist on disk then an empty DB is
        initialized. Otherwise content is loaded from disk.'''
        self.path = path
        self._changes = DbStore()
        if not path:
            self._d = DbStore()
            return
        self._d = self.read_database()

    def read_database(self) -> DbStore:
        assert self.path
        try:
            # pylint: disable=consider-using-with
            f = open(self.path, 'rb')
        except FileNotFoundError:
            return DbStore()
        try:
            db = cast(DbStore, pickle.load(f))
            if isinstance(db, dict):
                # This database format was only ever used in experimental
                # versions of this program, so (a) this code will be removed
                # once the program is shipped past experimental and (b)
                # therefore I'm not troubling my translators with translating
                # this string.
                # This is also why I'm not bothering with conversion code.
                debug(f'Database {self.path} has obsolete format, discarding')
                return DbStore()
            return db
        except Exception as ex:
            raise DbError(_("Database %(db)s failed to load: %(errmsg)s")
                          % {'db': self.path, 'errmsg': str(ex)}) from ex
        finally:
            f.close()

    def seen_here(self, entry: ChangelogEntry, exact: bool = True) -> bool:
        '''True if the entry has been seen in this path before'''
        index = self._d.exact if exact else self._d.similar
        checksum = self._checksum(entry, exact)
        return checksum in index.get(entry.path, {})

    def seen_elsewhere(self, entry: ChangelogEntry,
                       exact: bool = True) -> bool:
        '''True if the entry has been seen in a different path before'''
        index = self._d.exact if exact else self._d.similar
        checksum = self._checksum(entry, exact)
        return any(checksum in checksums
                   for path, checksums in index.items()
                   if path != entry.path)

    def seen_anywhere(self, entry: ChangelogEntry,
                      exact: bool = True) -> bool:
        '''True if the entry has been seen in any path before'''
        index = self._d.exact if exact else self._d.similar
        checksum = self._checksum(entry, exact)
        return any(checksum in checksums
                   for path, checksums in index.items())

    def has_package(self, package: str) -> bool:
        '''True if the specified package has been seen before'''
        return package in self._d.packages

    def add_package(self, package: str) -> None:
        '''Add the specified package to the database if not already there'''
        if package in self._d.packages:
            return
        now = int(time.time())
        self._d.packages[package] = now
        self._changes.packages[package] = now

    def add(self, entry: ChangelogEntry) -> None:
        '''Add the specified entry to the database if not already there'''
        if self.seen_here(entry):
            return

        now = int(time.time())

        self.add_package(entry.package)

        checksum = self._checksum(entry)
        self._d.exact.setdefault(entry.path, {})[checksum] = now
        self._changes.exact.setdefault(entry.path, {})[checksum] = now

        checksum = self._checksum(entry, False)
        self._d.similar.setdefault(entry.path, {})[checksum] = now
        self._changes.similar.setdefault(entry.path, {})[checksum] = now

    @staticmethod
    def _checksum(entry: ChangelogEntry, exact: bool = True) -> bytes:
        content = str(entry) if exact else (entry.content + entry.trailer)
        return hashlib.md5(content.strip().encode()).digest()

    def _expire_old(self) -> None:
        self._d.expire_old()

    def save_as(self, path: str, force: bool = False) -> None:
        '''Save database to specified path rather than default location
        Raises an exception if file already exists unless force is True'''
        with open(path, 'wb' if force else 'xb') as f:
            db = self._d + self._changes
            db.expire_old()
            pickle.dump(db, f)

    def apply_changes(self) -> None:
        if not self.path:
            return
        if self._changes.empty:
            return
        oldname = f'{self.path}-old'
        newname = f'{self.path}-new'
        # We have to create the file in order to lock it!
        reread_db = os.path.exists(self.path)
        with open(self.path, 'ab') as lockable:
            lockable.seek(0)
            os.lockf(lockable.fileno(), os.F_LOCK, 0)
            # In case somebody else modified it while we were working
            if reread_db:
                self._d = self.read_database()
            else:
                self._d = DbStore()
            self._d += self._changes
            self._changes = DbStore()
            self._expire_old()
            with open(newname, 'wb') as f:
                pickle.dump(self._d, f)
            try:
                os.unlink(oldname)
            except FileNotFoundError:
                pass
            os.link(self.path, oldname)
            os.rename(newname, self.path)

    @staticmethod
    def _dump_line(prefix: str, key: str | bytes, stamp: int) -> str:
        if isinstance(key, bytes):
            key = "".join((f"{c:0{2}x}" for c in key))
        return (f'{prefix}{key} {stamp} '
                f'({datetime.datetime.fromtimestamp(stamp).strftime("%F")})')

    def dump(self) -> None:
        print('packages:')
        for package, stamp in sorted(self._d.packages.items()):
            print(self._dump_line('  ', package, stamp))

        print('exact checksums:')
        for path, checksums in sorted(self._d.exact.items()):
            print(f'  {path}:')
            for checksum, stamp in sorted(checksums.items()):
                print(self._dump_line('    ', checksum, stamp))

        print('similar checksums:')
        for path, checksums in sorted(self._d.similar.items()):
            print(f'  {path}:')
            for checksum, stamp in sorted(checksums.items()):
                print(self._dump_line('    ', checksum, stamp))
