Source code for neural_pipeline.monitoring

"""
Main module for monitoring training process

There is:

* :class:`MonitorHub` - monitors collection for connect all monitors to :class:`Trainer`
* :class:`AbstractMonitor` - basic class for all monitors, that will be connected to :class:`MonitorHub`
* :class:`ConsoleMonitor` - monitor, that used for write epoch results to console
* :class:`LogMonitor` - monitor, used for metrics logging
"""

import json
import os
from abc import ABCMeta
import numpy as np

from neural_pipeline.train_config import MetricsGroup
from neural_pipeline.utils import dict_recursive_bypass
from neural_pipeline.utils.file_structure_manager import FileStructManager, FolderRegistrable

__all__ = ['MonitorHub', 'AbstractMonitor', 'ConsoleMonitor', 'LogMonitor']


[docs]class AbstractMonitor(metaclass=ABCMeta): """ Basic class for every monitor. """ def __init__(self): self.epoch_num = 0
[docs] def set_epoch_num(self, epoch_num: int) -> None: """ Set current epoch num :param epoch_num: num of current epoch """ self.epoch_num = epoch_num
[docs] def update_metrics(self, metrics: {}) -> None: """ Update metrics on monitor :param metrics: metrics dict with keys 'metrics' and 'groups' """ pass
[docs] def update_losses(self, losses: {}) -> None: """ Update losses on monitor :param losses: losses values dict with keys is names of stages in train pipeline (e.g. [train, validation]) """ pass
@staticmethod def _iterate_by_losses(losses: {}, callback: callable) -> None: """ Internal method for unify iteration by losses dict :param losses: dic of losses :param callback: callable, that call for every loss value and get params loss_name and loss_values: ``callback(name: str, values: np.ndarray)`` """ for m, v in losses.items(): callback(m, v) def register_event(self, text: str) -> None: pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass
[docs]class ConsoleMonitor(AbstractMonitor): """ Monitor, that used for write metrics to console. Output looks like: ``Epoch: [#]; train: [-1, 0, 1]; validation: [-1, 0, 1]``. This 3 numbers is [min, mean, max] values of training stage loss values """ class ResStr: def __init__(self, start: str): self.res = start def append(self, string: str): self.res += string def __str__(self): return self.res[:len(self.res) - 1]
[docs] def update_losses(self, losses: {}) -> None: def on_loss(name: str, values: np.ndarray, string) -> None: string.append(" {}: [{:4f}, {:4f}, {:4f}];".format(name, np.min(values), np.mean(values), np.max(values))) res_string = self.ResStr("Epoch: [{}];".format(self.epoch_num)) self._iterate_by_losses(losses, lambda m, v: on_loss(m, v, res_string)) print(res_string)
[docs]class LogMonitor(AbstractMonitor, FolderRegistrable): """ Monitor, used for logging metrics. It's write full log and can also write last metrics in separate file if required All output files in JSON format and stores in ``<base_dir_path>/monitors/metrics_log`` :param fsm: :class:`FileStructManager` object """ def __init__(self, fsm: FileStructManager): super().__init__() self._fsm = fsm self._fsm.register_dir(self) self._storage = {} self._file = self._get_file_name(False) self._final_metrics_file = None
[docs] def write_final_metrics(self, path: str = None) -> 'LogMonitor': """ Enable saving final metrics to separate file :param path: path to result file. If not defined, file will placed near full metrics log and named 'metrics.json` :return: self object """ if path is not None: self._final_metrics_file = path else: self._final_metrics_file = self._get_final_file_name(False) return self
[docs] def get_final_metrics_file(self) -> str or None: """ Get final metrics file path :return: path or None if writing doesn't enabled by :meth:`write_final_metrics` """ return self._final_metrics_file
[docs] def update_metrics(self, metrics: {}) -> None: for metric in metrics['metrics']: self._process_metric(metric) for metrics_group in metrics['groups']: for metric in metrics_group.metrics(): self._process_metric(metric, metrics_group.name()) for group in metrics_group.groups(): self._process_metric(group, metrics_group.name())
[docs] def update_losses(self, losses: {}) -> None: def on_loss(name: str, values: np.ndarray): store = self._cur_storage([name, 'loss']) store.append(float(np.mean(values))) self._iterate_by_losses(losses, on_loss)
def _process_metric(self, cur_metric, parent_tag: str = None) -> None: """ Internal method for processing metrics or metrics groups :param cur_metric: :class:`AbstractMetric` or :class:`MetricsGroup` object :param parent_tag: parent tag for place metric in storage """ if isinstance(cur_metric, MetricsGroup): for m in cur_metric.metrics(): if m.get_values().size > 0: store = self._cur_storage([parent_tag, cur_metric.name(), m.name()]) store.append(float(np.mean(m.get_values()))) else: values = cur_metric.get_values().astype(np.float32) if values.size > 0: store = self._cur_storage([parent_tag, cur_metric.name()]) store.append(float(np.mean(values))) def _flush_metrics(self) -> None: """ Flush metrics files """ with open(self._get_file_name(True), 'w') as out: json.dump(self._storage, out) if self._final_metrics_file is not None: res = dict_recursive_bypass(self._storage, lambda v: v[-1]) with open(self._final_metrics_file, 'w') as out: json.dump(res, out) def _cur_storage(self, names: [str]) -> [] or {}: """ Get current substorage by path of names :param names: list on names (path to target substorage) :return: substorage """ res = self._storage for i, n in enumerate(names): if n is None: continue if n not in res: res[n] = {} if i < (len(names) - 1) else [] res = res[n] return res
[docs] def close(self) -> None: """ Close monitor """ self._flush_metrics()
def __exit__(self, exc_type, exc_val, exc_tb): self.close() def _get_file_name(self, create: bool) -> str: return os.path.join(self._fsm.get_path(self, create), 'metrics_log.json') def _get_final_file_name(self, create: bool) -> str: return os.path.join(self._fsm.get_path(self, create), 'metrics.json') def _get_gir(self) -> str: return os.path.join('monitors', 'metrics_log') def _get_name(self) -> str: return 'LogMonitor'
[docs]class MonitorHub: """ Aggregator of monitors. This class collect monitors and provide unified interface to it's """ def __init__(self): self.monitors = []
[docs] def set_epoch_num(self, epoch_num: int) -> None: """ Set current epoch num :param epoch_num: num of current epoch """ for m in self.monitors: m.set_epoch_num(epoch_num)
[docs] def add_monitor(self, monitor: AbstractMonitor) -> 'MonitorHub': """ Connect monitor to hub :param monitor: :class:`AbstractMonitor` object :return: """ self.monitors.append(monitor) return self
[docs] def update_metrics(self, metrics: {}) -> None: """ Update metrics in all monitors :param metrics: metrics dict with keys 'metrics' and 'groups' """ for m in self.monitors: m.update_metrics(metrics)
[docs] def update_losses(self, losses: {}) -> None: """ Update monitor :param losses: losses values with keys 'train' and 'validation' """ for m in self.monitors: m.update_losses(losses)
def register_event(self, text: str) -> None: for m in self.monitors: m.register_event(text) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): for m in self.monitors: m.__exit__(exc_type, exc_val, exc_tb)