Source code for gromacs.fileformats.xvg

# -*- encoding: utf-8 -*-
# GromacsWrapper: formats.py
# Copyright (c) 2009-2012 Oliver Beckstein <orbeckst@gmail.com>
# Released under the GNU Public License 3 (or higher, your choice)
# See the file COPYING for details.
"""
Simple xmgrace XVG file format
==============================

Gromacs produces graphs in the `xmgrace`_ ("xvg") format. These are
simple multi-column data files. The class :class:`XVG` encapsulates
access to such files and adds a number of methods to access the data
(as NumPy arrays), compute aggregates, or quickly plot it.

.. _xmgrace: http://plasma-gate.weizmann.ac.il/Grace/

The :class:`XVG` class is useful beyond reading xvg files. With the
*array* keyword or the :meth:`XVG.set` method one can load data from
an array instead of a file. The array should be simple "NXY" data
(typically: first column time or position, further columns scalar
observables). The data should be a NumPy :class:`numpy.ndarray` array
``a`` with :attr:`~numpy.ndarray.shape` ``(M, N)`` where *M*-1 is the
number of observables and *N* the number of observations, e.g.the
number of time points in a time series. ``a[0]`` is the time or
position and ``a[1:]`` the *M*-1 data columns.


Errors
------

The :attr:`XVG.error` attribute contains the statistical error for
each timeseries. It is computed from the standard deviation of the
fluctuations from the mean and their correlation time. The parameters
for the calculations of the correlation time are set with
:meth:`XVG.set_correlparameters`.

.. SeeAlso:: :func:`numkit.timeseries.tcorrel`


Plotting
--------

The :meth:`XVG.plot` and :meth:`XVG.errorbar` methods are set up to
produce graphs of multiple columns simultaneously. It is
typically assumed that the first column in the selected (sub)array
contains the abscissa ("x-axis") of the graph and all further columns
are plotted against the first one.

Data selection
~~~~~~~~~~~~~~

Plotting from :class:`XVG` is fairly flexible as one can always pass
the *columns* keyword to select which columns are to be
plotted. Assuming that the data contains ``[t, X1, X2, X3]``, then one
can

1) plot all observable columns (X1 to X3) against t::

     xvg.plot()

2) plot only X2 against t::

     xvg.plot(columns=[0,2])

3) plot X2 and X3 against t::

     xvg.plot(columns=[0,2,3])

4) plot X1 against X3::

     xvg.plot(columns=[2,3])


Coarse grainining of data
~~~~~~~~~~~~~~~~~~~~~~~~~

It is also possible to *coarse grain the data* for plotting (which
typically results in visually smoothing the graph because noise is
averaged out).

Currently, two alternative algorithms to produce "coarse grained"
(decimated) graphs are implemented and can be selected with the
*method* keyword for the plotting functions in conjuction with
*maxpoints* (the number of points to be plotted):

1) **mean** histogram (default) --- bin the data (using
   :func:`numkit.timeseries.regularized_function` and compute the
   mean for each bin. Gives the exact number of desired points
   but the time data are whatever the middle of the bin is.

2) **smooth** subsampled --- smooth the data with a running average
   (other windows like Hamming are also possible) and then pick data
   points at a stepsize compatible with the number of data points
   required. Will give exact times but not the exact number of data
   points.

For simple test data, both approaches give very similar output.

For the special case of periodic data such as angles, one can use the
circular mean ("circmean") to coarse grain. In this case, jumps across
the -180º/+180º boundary are added as masked datapoints and no line is
drawn across the jump in the plot. (Only works with the simple
:meth:`XVG.plot` method at the moment, errorbars or range plots are
not implemented yet.)

.. SeeAlso:: :meth:`XVG.decimate`

Examples
--------

In this example we generate a noisy time series of a sine wave. We
store the time, the value, and an error. (In a real example, the
value might be the mean over multiple observations and the error might
be the estimated error of the mean.)

  >>> import numpy as np
  >>> import gromacs.formats
  >>> X = np.linspace(-10,10,50000)
  >>> yerr = np.random.randn(len(X))*0.05
  >>> data = np.vstack((X, np.sin(X) + yerr, np.random.randn(len(X))*0.05))
  >>> xvg = gromacs.formats.XVG(array=data)

Plot value for *all* time points::

  >>> xvg.plot(columns=[0,1], maxpoints=None, color="black", alpha=0.2)

Plot bin-averaged (decimated) data with the errors, over 1000 points::

  >>> xvg.errorbar(maxpoints=1000, color="red")

(see output in Figure :ref:`Plot of Raw vs Decimated data <figure-xvg-decimated-label>`)

.. _figure-xvg-decimated-label:

.. figure:: xvg_decimated.*
   :figwidth: 40%
   :scale: 70%
   :alt: plot of a raw noisy sin(x) graph versus its decimated version
   :align: right

   **Plot of Raw vs Decimated data.** Example of plotting raw data
   (sine on 50,000 points, gray) versus the decimated graph (reduced
   to 1000 points, red line). The errors were also decimated and
   reduced to the errors within the 5% and the 95% percentile. The
   decimation is carried out by histogramming the data in the desired
   number of bins and then the data in each bin is reduced by either
   :func:`numpy.mean` (for the value) or
   :func:`scipy.stats.scoreatpercentile` (for errors).

In principle it is possible to use other functions to decimate the
data. For :meth:`XVG.plot`, the *method* keyword can be changed (see
:meth:`XVG.decimate` for allowed *method* values). For
:meth:`XVG.errorbar`, the method to reduce the data values (typically
column 1) is fixed to "mean" but the errors (typically columns 2 and 3)
can also be reduced with *error_method* = "rms".

If one wants to show the variation of the raw data together with the
decimated and smoothed data then one can plot the percentiles of the
deviation from the mean in each bin:

   >>> xvg.errorbar(columns=[0,1,1], maxpoints=1000, color="blue", demean=True)

The *demean* keyword indicates if fluctuations from the mean are
regularised [#demean]_. The method :meth:`XVG.plot_coarsened`
automates this approach and can plot coarsened data selected by the
*columns* keyword.

.. rubric:: Footnotes

.. [#demean] When *error_method* = "percentile" is selected for
             :meth:`XVG.errorbar` then *demean* does not actually
             force a regularisation of the fluctuations from the
             mean. Instead, the (symmetric) percentiles are computed
             on the full data and the error ranges for plotting are
             directly set to the percentiles. In this way one can
             easily plot the e.g. 10th-percentile to 90th-percentile
             band (using keyword *percentile* = 10).



Classes and functions
---------------------

.. autoclass:: XVG
   :members:

.. autofunction:: break_array

"""

import os
import errno
import re
import warnings
from itertools import cycle
import logging

import numpy
import matplotlib.cm, matplotlib.colors
from matplotlib import pyplot as plt

import numkit.timeseries

from gromacs.exceptions import (
    ParseError,
    MissingDataError,
    MissingDataWarning,
    AutoCorrectionWarning,
)
import gromacs.utilities as utilities
import gromacs.collections


[docs] class XVG(utilities.FileUtils): """Class that represents the numerical data in a grace xvg file. All data must be numerical. :const:`nan` and :const:`inf` values are supported via python's :func:`float` builtin function. The :attr:`~XVG.array` attribute can be used to access the the array once it has been read and parsed. The :attr:`~XVG.ma` attribute is a numpy masked array (good for plotting). Conceptually, the file on disk and the XVG instance are considered the same data. Whenever the filename for I/O (:meth:`XVG.read` and :meth:`XVG.write`) is changed then the filename associated with the instance is also changed to reflect the association between file and instance. With the *permissive* = ``True`` flag one can instruct the file reader to skip unparseable lines. In this case the line numbers of the skipped lines are stored in :attr:`XVG.corrupted_lineno`. A number of attributes are defined to give quick access to simple statistics such as - :attr:`~XVG.mean`: mean of all data columns - :attr:`~XVG.std`: standard deviation - :attr:`~XVG.min`: minimum of data - :attr:`~XVG.max`: maximum of data - :attr:`~XVG.error`: error on the mean, taking correlation times into account (see also :meth:`XVG.set_correlparameters`) - :attr:`~XVG.tc`: correlation time of the data (assuming a simple exponential decay of the fluctuations around the mean) These attributes are numpy arrays that correspond to the data columns in :attr:`XVG.array`, i.e. ``XVG.array[1:]``. .. Note:: - Only simple XY or NXY files are currently supported, *not* Grace files that contain multiple data sets separated by '&'. - Any kind of formatting (i.e. :program:`xmgrace` commands) is discarded. """ #: Default extension of XVG files. default_extension = "xvg" #: Aim for plotting around that many points maxpoints_default = 10000 # logger: for pickling to work, this *must* be class-level and # cannot be done in __init__() (because we cannot pickle self.logger) logger = logging.getLogger("gromacs.formats.XVG") #: If :attr:`XVG.savedata` is ``False`` then any attributes in #: :attr:`XVG.__pickle_excluded` are *not* pickled as they are but simply #: pickled with the default value. __pickle_excluded = { "__array": None } # note class name un-mangling in __getstate__()! #: Default color cycle for :meth:`XVG.plot_coarsened`: #: ``['black', 'red', 'blue', 'orange', 'magenta', 'cyan', 'yellow', 'brown', 'green']`` default_color_cycle = [ "black", "red", "blue", "orange", "magenta", "cyan", "yellow", "brown", "green", ] def __init__( self, filename=None, names=None, array=None, permissive=False, **kwargs ): """Initialize the class from a xvg file. :Arguments: *filename* is the xvg file; it can only be of type XY or NXY. If it is supplied then it is read and parsed when :attr:`XVG.array` is accessed. *names* optional labels for the columns (currently only written as comments to file); string with columns separated by commas or a list of strings *array* read data from *array* (see :meth:`XVG.set`) *permissive* ``False`` raises a :exc:`ValueError` and logs and errior when encountering data lines that it cannot parse. ``True`` ignores those lines and logs a warning---this is a risk because it might read a corrupted input file [``False``] *stride* Only read every *stride* line of data [1]. *savedata* ``True`` includes the data (:attr:`XVG.array`` and associated caches) when the instance is pickled (see :mod:`pickle`); this is oftens not desirable because the data are already on disk (the xvg file *filename*) and the resulting pickle file can become very big. ``False`` omits those data from a pickle. [``False``] *metadata* dictionary of metadata, which is not touched by the class """ self.__array = None # cache for array (BIG) (used by XVG.array) self.__cache = {} # cache for computed results self.savedata = kwargs.pop("savedata", False) if filename is not None: self._init_filename( filename ) # note: reading data from file is delayed until required if names is None: self.names = [] else: try: self.names = names.split(",") except AttributeError: self.names = names self.metadata = kwargs.pop("metadata", {}) # reserved for user data self.permissive = permissive self.stride = kwargs.pop("stride", 1) self.corrupted_lineno = None # must parse() first before this makes sense # default number of data points for calculating correlation times via FFT self.ncorrel = kwargs.pop("ncorrel", 25000) self.__correlkwargs = {} # see set_correlparameters() if array is not None: self.set(array)
[docs] def read(self, filename=None): """Read and parse xvg file *filename*.""" self._init_filename(filename) self.parse()
[docs] def write(self, filename=None): """Write array to xvg file *filename* in NXY format. .. Note:: Only plain files working at the moment, not compressed. """ self._init_filename(filename) with utilities.openany(self.real_filename, "w") as xvg: xvg.write( "# xmgrace compatible NXY data file\n" "# Written by gromacs.formats.XVG()\n" ) xvg.write("# :columns: {0!r}\n".format(self.names)) for xyy in self.array.T: xyy.tofile( xvg, sep=" ", format="%-8s" ) # quick and dirty ascii output...--no compression! xvg.write("\n")
@property def array(self): """Represent xvg data as a (cached) numpy array. The array is returned with column-first indexing, i.e. for a data file with columns X Y1 Y2 Y3 ... the array a will be a[0] = X, a[1] = Y1, ... . """ if self.__array is None: self.parse() return self.__array @property def ma(self): """Represent data as a masked array. The array is returned with column-first indexing, i.e. for a data file with columns X Y1 Y2 Y3 ... the array a will be a[0] = X, a[1] = Y1, ... . inf and nan are filtered via :func:`numpy.isfinite`. """ a = self.array return numpy.ma.MaskedArray(a, mask=numpy.logical_not(numpy.isfinite(a))) @property def mean(self): """Mean value of all data columns.""" return self.array[1:].mean(axis=1) @property def std(self): """Standard deviation from the mean of all data columns.""" return self.array[1:].std(axis=1) @property def min(self): """Minimum of the data columns.""" return self.array[1:].min(axis=1) @property def max(self): """Maximum of the data columns.""" return self.array[1:].max(axis=1) def _tcorrel(self, nstep=100, **kwargs): """Correlation "time" of data. The 0-th column of the data is interpreted as a time and the decay of the data is computed from the autocorrelation function (using FFT). .. SeeAlso:: :func:`numkit.timeseries.tcorrel` """ t = self.array[0, ::nstep] r = gromacs.collections.Collection( [ numkit.timeseries.tcorrel(t, Y, nstep=1, **kwargs) for Y in self.array[1:, ::nstep] ] ) return r
[docs] def set_correlparameters(self, **kwargs): """Set and change the parameters for calculations with correlation functions. The parameters persist until explicitly changed. :Keywords: *nstep* only process every *nstep* data point to speed up the FFT; if left empty a default is chosen that produces roughly 25,000 data points (or whatever is set in *ncorrel*) *ncorrel* If no *nstep* is supplied, aim at using *ncorrel* data points for the FFT; sets :attr:`XVG.ncorrel` [25000] *force* force recalculating correlation data even if cached values are available *kwargs* see :func:`numkit.timeseries.tcorrel` for other options .. SeeAlso: :attr:`XVG.error` for details and references. """ self.ncorrel = kwargs.pop("ncorrel", self.ncorrel) or 25000 nstep = kwargs.pop("nstep", None) if nstep is None: # good step size leads to ~25,000 data points nstep = len(self.array[0]) / float(self.ncorrel) nstep = int(numpy.ceil(nstep)) # catch small data sets kwargs["nstep"] = nstep self.__correlkwargs.update( kwargs ) # only contains legal kw for numkit.timeseries.tcorrel or force return self.__correlkwargs
def _correlprop(self, key, **kwargs): kwargs = self.set_correlparameters(**kwargs) if not self.__cache.get("tcorrel", None) or kwargs.pop("force", False): self.__cache["tcorrel"] = self._tcorrel(**kwargs) return numpy.array(self.__cache["tcorrel"].get(key).tolist()) @property def error(self): """Error on the mean of the data, taking the correlation time into account. See [FrenkelSmit2002]_ `p526`_: error = sqrt(2*tc*acf[0]/T) where acf() is the autocorrelation function of the fluctuations around the mean, y-<y>, tc is the correlation time, and T the total length of the simulation. .. [FrenkelSmit2002] D. Frenkel and B. Smit, Understanding Molecular Simulation. Academic Press, San Diego 2002 .. _p526: http://books.google.co.uk/books?id=XmyO2oRUg0cC&pg=PA526 """ return self._correlprop("sigma") @property def tc(self): """Correlation time of the data. See :meth:`XVG.error` for details. """ return self._correlprop("tc")
[docs] def parse(self, stride=None): """Read and cache the file as a numpy array. Store every *stride* line of data; if ``None`` then the class default is used. The array is returned with column-first indexing, i.e. for a data file with columns X Y1 Y2 Y3 ... the array a will be a[0] = X, a[1] = Y1, ... . """ if stride is None: stride = self.stride self.corrupted_lineno = [] irow = 0 # count rows of data # cannot use numpy.loadtxt() because xvg can have two types of 'comment' lines with utilities.openany(self.real_filename) as xvg: rows = [] ncol = None for lineno, line in enumerate(xvg): line = line.strip() if len(line) == 0: continue if "label" in line and "xaxis" in line: self.xaxis = line.split('"')[-2] if "label" in line and "yaxis" in line: self.yaxis = line.split('"')[-2] if line.startswith("@ legend"): if not "legend" in self.metadata: self.metadata["legend"] = [] self.metadata["legend"].append(line.split("legend ")[-1]) if line.startswith("@ s") and "subtitle" not in line: name = line.split("legend ")[-1].replace('"', "").strip() self.names.append(name) if line.startswith(("#", "@")): continue if line.startswith("&"): raise NotImplementedError( "{0!s}: Multi-data not supported, only simple NXY format.".format( self.real_filename ) ) # parse line as floats try: row = [float(el) for el in line.split()] except: if self.permissive: self.logger.warning( "%s: SKIPPING unparsable line %d: %r", self.real_filename, lineno + 1, line, ) self.corrupted_lineno.append(lineno + 1) continue self.logger.error( "%s: Cannot parse line %d: %r", self.real_filename, lineno + 1, line, ) raise # check for same number of columns as in previous step if ncol is not None and len(row) != ncol: if self.permissive: self.logger.warning( "%s: SKIPPING line %d with wrong number of columns: %r", self.real_filename, lineno + 1, line, ) self.corrupted_lineno.append(lineno + 1) continue errmsg = ( "{0!s}: Wrong number of columns in line {1:d}: {2!r}".format( self.real_filename, lineno + 1, line ) ) self.logger.error(errmsg) raise IOError(errno.ENODATA, errmsg, self.real_filename) # finally: a good line if irow % stride == 0: ncol = len(row) rows.append(row) irow += 1 try: self.__array = numpy.array(rows).transpose() # cache result except: self.logger.error( "%s: Failed reading XVG file, possibly data corrupted. " "Check the last line of the file...", self.real_filename, ) raise finally: del rows # try to clean up as well as possible as it can be massively big
def to_df(self): import pandas as _pd return _pd.DataFrame( self.array.T, columns=[self.xaxis] + (self.names if len(self.names) else [self.yaxis]), dtype=float, )
[docs] def set(self, a): """Set the *array* data from *a* (i.e. completely replace). No sanity checks at the moment... """ self.__array = numpy.asarray(a)
def _get_colors(self, color, columns): try: cmap = matplotlib.colormaps[color] colors = cmap( matplotlib.colors.Normalize()( numpy.arange(len(columns[1:]), dtype=float) ) ) except (TypeError, ValueError): colors = cycle(utilities.asiterable(color)) return colors
[docs] def plot(self, **kwargs): """Plot xvg file data. The first column of the data is always taken as the abscissa X. Additional columns are plotted as ordinates Y1, Y2, ... In the special case that there is only a single column then this column is plotted against the index, i.e. (N, Y). :Keywords: *columns* : list Select the columns of the data to be plotted; the list is used as a numpy.array extended slice. The default is to use all columns. Columns are selected *after* a transform. *transform* : function function ``transform(array) -> array`` which transforms the original array; must return a 2D numpy array of shape [X, Y1, Y2, ...] where X, Y1, ... are column vectors. By default the transformation is the identity [``lambda x: x``]. *maxpoints* : int limit the total number of data points; matplotlib has issues processing png files with >100,000 points and pdfs take forever to display. Set to ``None`` if really all data should be displayed. At the moment we simply decimate the data at regular intervals. [10000] *method* method to decimate the data to *maxpoints*, see :meth:`XVG.decimate` for details *color* single color (used for all plots); sequence of colors (will be repeated as necessary); or a matplotlib colormap (e.g. "jet", see :mod:`matplotlib.cm`). The default is to use the :attr:`XVG.default_color_cycle`. *ax* plot into given axes or create new one if ``None`` [``None``] *kwargs* All other keyword arguments are passed on to :func:`matplotlib.pyplot.plot`. :Returns: *ax* axes instance """ columns = kwargs.pop("columns", Ellipsis) # slice for everything maxpoints = kwargs.pop("maxpoints", self.maxpoints_default) transform = kwargs.pop( "transform", lambda x: x ) # default is identity transformation method = kwargs.pop("method", "mean") ax = kwargs.pop("ax", None) if columns is Ellipsis or columns is None: columns = numpy.arange(self.array.shape[0]) if len(columns) == 0: raise MissingDataError("plot() needs at least one column of data") if len(self.array.shape) == 1 or self.array.shape[0] == 1: # special case: plot against index; plot would do this automatically but # we'll just produce our own xdata and pretend that this was X all along a = numpy.ravel(self.array) X = numpy.arange(len(a)) a = numpy.vstack((X, a)) columns = [0] + [c + 1 for c in columns] else: a = self.array colors = self._get_colors( kwargs.pop("color", self.default_color_cycle), columns ) if ax is None: ax = plt.gca() # (decimate/smooth o slice o transform)(array) a = self.decimate( method, numpy.asarray(transform(a))[columns], maxpoints=maxpoints ) # now deal with infs, nans etc AFTER all transformations (needed for plotting across inf/nan) ma = numpy.ma.MaskedArray(a, mask=numpy.logical_not(numpy.isfinite(a))) # finally plot (each column separately to catch empty sets) for column, color in zip(range(1, len(columns)), colors): if len(ma[column]) == 0: warnings.warn( "No data to plot for column {column:d}".format(**vars()), category=MissingDataWarning, ) kwargs["color"] = color ax.plot(ma[0], ma[column], **kwargs) # plot all other columns in parallel return ax
[docs] def plot_coarsened(self, **kwargs): """Plot data like :meth:`XVG.plot` with the range of **all** data shown. Data are reduced to *maxpoints* (good results are obtained with low values such as 100) and the actual range of observed data is plotted as a translucent error band around the mean. Each column in *columns* (except the abscissa, i.e. the first column) is decimated (with :meth:`XVG.decimate`) and the range of data is plotted alongside the mean using :meth:`XVG.errorbar` (see for arguments). Additional arguments: :Kewords: *maxpoints* number of points (bins) to coarsen over *color* single color (used for all plots); sequence of colors (will be repeated as necessary); or a matplotlib colormap (e.g. "jet", see :mod:`matplotlib.cm`). The default is to use the :attr:`XVG.default_color_cycle`. *method* Method to coarsen the data. See :meth:`XVG.decimate` The *demean* keyword has no effect as it is required to be ``True``. .. SeeAlso:: :meth:`XVG.plot`, :meth:`XVG.errorbar` and :meth:`XVG.decimate` """ ax = kwargs.pop("ax", None) columns = kwargs.pop("columns", Ellipsis) # slice for everything if columns is Ellipsis or columns is None: columns = numpy.arange(self.array.shape[0]) if len(columns) < 2: raise MissingDataError( "plot_coarsened() assumes that there is at least one column " "of data for the abscissa and one or more for the ordinate." ) colors = self._get_colors( kwargs.pop("color", self.default_color_cycle), columns ) if ax is None: ax = plt.gca() t = columns[0] kwargs["demean"] = True kwargs["ax"] = ax for column, color in zip(columns[1:], colors): kwargs["color"] = color self.errorbar(columns=[t, column, column], **kwargs) return ax
[docs] def errorbar(self, **kwargs): """errorbar plot for a single time series with errors. Set *columns* keyword to select [x, y, dy] or [x, y, dx, dy], e.g. ``columns=[0,1,2]``. See :meth:`XVG.plot` for details. Only a single timeseries can be plotted and the user needs to select the appropriate columns with the *columns* keyword. By default, the data are decimated (see :meth:`XVG.plot`) for the default of *maxpoints* = 10000 by averaging data in *maxpoints* bins. x,y,dx,dy data can plotted with error bars in the x- and y-dimension (use *filled* = ``False``). For x,y,dy use *filled* = ``True`` to fill the region between y±dy. *fill_alpha* determines the transparency of the fill color. *filled* = ``False`` will draw lines for the error bars. Additional keywords are passed to :func:`pylab.errorbar`. By default, the errors are decimated by plotting the 5% and 95% percentile of the data in each bin. The percentile can be changed with the *percentile* keyword; e.g. *percentile* = 1 will plot the 1% and 99% perentile (as will *percentile* = 99). The *error_method* keyword can be used to compute errors as the root mean square sum (*error_method* = "rms") across each bin instead of percentiles ("percentile"). The value of the keyword *demean* is applied to the decimation of error data alone. .. SeeAlso:: :meth:`XVG.plot` lists keywords common to both methods. """ ax = kwargs.pop("ax", None) color = kwargs.pop("color", "black") filled = kwargs.pop("filled", True) fill_alpha = kwargs.pop("fill_alpha", 0.2) kwargs.setdefault("capsize", 0) kwargs.setdefault("elinewidth", 1) kwargs.setdefault("ecolor", color) kwargs.setdefault("alpha", 0.3) kwargs.setdefault("fmt", None) columns = kwargs.pop("columns", Ellipsis) # slice for everything maxpoints = kwargs.pop("maxpoints", self.maxpoints_default) transform = kwargs.pop( "transform", lambda x: x ) # default is identity transformation method = kwargs.pop("method", "mean") if method != "mean": raise NotImplementedError("For errors only method == 'mean' is supported.") error_method = kwargs.pop( "error_method", "percentile" ) # can also use 'rms' and 'error' percentile = numpy.abs(kwargs.pop("percentile", 95.0)) demean = kwargs.pop("demean", False) # order: (decimate/smooth o slice o transform)(array) try: data = numpy.asarray(transform(self.array))[columns] except IndexError: raise MissingDataError( "columns {0!r} are not suitable to index the transformed array, possibly not eneough data".format( columns ) ) if data.shape[-1] == 0: raise MissingDataError("There is no data to be plotted.") a = numpy.zeros((data.shape[0], maxpoints), dtype=numpy.float64) a[0:2] = self.decimate("mean", data[0:2], maxpoints=maxpoints) error_data = numpy.vstack((data[0], data[2:])) if error_method == "percentile": if percentile > 50: upper_per = percentile lower_per = 100 - percentile else: upper_per = 100 - percentile lower_per = percentile # demean generally does not make sense with the percentiles (but for analysing # the regularised data itself we use this as a flag --- see below!) upper = a[2:] = self.decimate( "percentile", error_data, maxpoints=maxpoints, per=upper_per, demean=False, )[1:] lower = self.decimate( "percentile", error_data, maxpoints=maxpoints, per=lower_per, demean=False, )[1:] else: a[2:] = self.decimate( error_method, error_data, maxpoints=maxpoints, demean=demean )[1:] lower = None # now deal with infs, nans etc AFTER all transformations (needed for plotting across inf/nan) ma = numpy.ma.MaskedArray(a, mask=numpy.logical_not(numpy.isfinite(a))) if lower is not None: mlower = numpy.ma.MaskedArray( lower, mask=numpy.logical_not(numpy.isfinite(lower)) ) # finally plot X = ma[0] # abscissa set separately Y = ma[1] try: kwargs["yerr"] = ma[3] kwargs["xerr"] = ma[2] except IndexError: try: kwargs["yerr"] = ma[2] except IndexError: raise TypeError( "Either too few columns selected or data does not have a error column" ) if ax is None: ax = plt.gca() if filled: # can only plot dy if error_method == "percentile": if demean: # signal that we are looking at percentiles of an observable and not error y1 = mlower[-1] y2 = kwargs["yerr"] else: # percentiles of real errors (>0) y1 = Y - mlower[-1] y2 = Y + kwargs["yerr"] else: y1 = Y - kwargs["yerr"] y2 = Y + kwargs["yerr"] ax.fill_between(X, y1, y2, color=color, alpha=fill_alpha) else: if error_method == "percentile": # errorbars extend to different lengths; if demean: kwargs["yerr"] = numpy.vstack((mlower[-1], kwargs["yerr"])) else: kwargs["yerr"] = numpy.vstack((Y - mlower[-1], Y + kwargs["yerr"])) try: # xerr only makes sense when the data is a real # error so we don't even bother with demean=? kwargs["xerr"] = numpy.vstack((X - mlower[0], X + kwargs["xerr"])) except (KeyError, IndexError): pass ax.errorbar(X, Y, **kwargs) # clean up args for plot for kw in "yerr", "xerr", "capsize", "ecolor", "elinewidth", "fmt": kwargs.pop(kw, None) kwargs["alpha"] = 1.0 ax.plot(X, Y, color=color, **kwargs) return ax
[docs] def decimate(self, method, a, maxpoints=10000, **kwargs): """Decimate data *a* to *maxpoints* using *method*. If *a* is a 1D array then it is promoted to a (2, N) array where the first column simply contains the index. If the array contains fewer than *maxpoints* points or if *maxpoints* is ``None`` then it is returned as it is. The default for *maxpoints* is 10000. Valid values for the reduction *method*: * "mean", uses :meth:`XVG.decimate_mean` to coarse grain by averaging the data in bins along the time axis * "circmean", uses :meth:`XVG.decimate_circmean` to coarse grain by calculating the circular mean of the data in bins along the time axis. Use additional keywords *low* and *high* to set the limits. Assumes that the data are in degrees. * "min" and "max* select the extremum in each bin * "rms", uses :meth:`XVG.decimate_rms` to coarse grain by computing the root mean square sum of the data in bins along the time axis (for averaging standard deviations and errors) * "percentile" with keyword *per*: :meth:`XVG.decimate_percentile` reduces data in each bin to the percentile *per* * "smooth", uses :meth:`XVG.decimate_smooth` to subsample from a smoothed function (generated with a running average of the coarse graining step size derived from the original number of data points and *maxpoints*) :Returns: numpy array ``(M', N')`` from a ``(M', N)`` array with ``M' == M`` (except when ``M == 1``, see above) and ``N' <= N`` (``N'`` is *maxpoints*). """ methods = { "mean": self.decimate_mean, "min": self.decimate_min, "max": self.decimate_max, "smooth": self.decimate_smooth, "rms": self.decimate_rms, "percentile": self.decimate_percentile, "error": self.decimate_error, # undocumented, not working well "circmean": self.decimate_circmean, } if len(a.shape) == 1: # add first column as index # (probably should do this in class/init anyway...) X = numpy.arange(len(a)) a = numpy.vstack([X, a]) ny = a.shape[-1] # assume 1D or 2D array with last dimension varying fastest if maxpoints is None or ny <= maxpoints: return a return methods[method](a, maxpoints, **kwargs)
[docs] def decimate_mean(self, a, maxpoints, **kwargs): """Return data *a* mean-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the weighted average in each bin as the decimated data, using :func:`numkit.timeseries.mean_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. Note:: Assumes that the first column is time. """ return self._decimate( numkit.timeseries.mean_histogrammed_function, a, maxpoints, **kwargs )
[docs] def decimate_circmean(self, a, maxpoints, **kwargs): """Return data *a* circmean-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the weighted circular mean in each bin as the decimated data, using :func:`numkit.timeseries.circmean_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. Keywords *low* and *high* can be used to set the boundaries. By default they are [-pi, +pi]. This method returns a **masked** array where jumps are flagged by an insertion of a masked point. .. Note:: Assumes that the first column is time and that the data are in **degrees**. .. Warning:: Breaking of arrays only works properly with a two-column array because breaks are only inserted in the x-column (a[0]) where y1 = a[1] has a break. """ a_rad = numpy.vstack((a[0], numpy.deg2rad(a[1:]))) b = self._decimate( numkit.timeseries.circmean_histogrammed_function, a_rad, maxpoints, **kwargs ) y_ma, x_ma = break_array(b[1], threshold=numpy.pi, other=b[0]) v = [y_ma] for y in b[2:]: v.append(break_array(y, threshold=numpy.pi)[0]) if v[-1].shape != v[0].shape: raise ValueError( "y dimensions have different breaks: you MUST deal with them separately" ) return numpy.vstack((x_ma, numpy.rad2deg(v)))
[docs] def decimate_min(self, a, maxpoints, **kwargs): """Return data *a* min-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the minimum in each bin as the decimated data, using :func:`numkit.timeseries.min_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. Note:: Assumes that the first column is time. """ return self._decimate( numkit.timeseries.min_histogrammed_function, a, maxpoints, **kwargs )
[docs] def decimate_max(self, a, maxpoints, **kwargs): """Return data *a* max-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the maximum in each bin as the decimated data, using :func:`numkit.timeseries.max_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. Note:: Assumes that the first column is time. """ return self._decimate( numkit.timeseries.max_histogrammed_function, a, maxpoints, **kwargs )
[docs] def decimate_rms(self, a, maxpoints, **kwargs): """Return data *a* rms-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the root mean square sum in each bin as the decimated data, using :func:`numkit.timeseries.rms_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. Note:: Assumes that the first column is time. """ return self._decimate( numkit.timeseries.rms_histogrammed_function, a, maxpoints, **kwargs )
[docs] def decimate_percentile(self, a, maxpoints, **kwargs): """Return data *a* percentile-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates the percentile *per* in each bin as the decimated data, using :func:`numkit.timeseries.percentile_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. Note:: Assumes that the first column is time. :Keywords: *per* percentile as a percentage, e.g. 75 is the value that splits the data into the lower 75% and upper 25%; 50 is the median [50.0] .. SeeAlso:: :func:`numkit.timeseries.regularized_function` with :func:`scipy.stats.scoreatpercentile` """ return self._decimate( numkit.timeseries.percentile_histogrammed_function, a, maxpoints, **kwargs )
[docs] def decimate_error(self, a, maxpoints, **kwargs): """Return data *a* error-decimated on *maxpoints*. Histograms each column into *maxpoints* bins and calculates an error estimate in each bin as the decimated data, using :func:`numkit.timeseries.error_histogrammed_function`. The coarse grained time in the first column contains the centers of the histogram time. If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of *maxpoints* points is returned. .. SeeAlso:: :func:`numkit.timeseries.tcorrel` .. Note:: Assumes that the first column is time. Does not work very well because often there are too few datapoints to compute a good autocorrelation function. """ warnings.warn( "Using undocumented decimate_error() is highly EXPERIMENTAL", category=gromacs.exceptions.LowAccuracyWarning, ) return self._decimate( numkit.timeseries.error_histogrammed_function, a, maxpoints, **kwargs )
def _decimate(self, func, a, maxpoints, **kwargs): ny = a.shape[-1] # assume 2D array with last dimension varying fastest out = numpy.zeros((a.shape[0], maxpoints), dtype=float) t = a[0] for i in range(1, a.shape[0]): # compute regularised data for each column separately out[i], out[0] = func(t, a[i], bins=maxpoints, **kwargs) if ( maxpoints == self.maxpoints_default ): # only warn if user did not set maxpoints try: funcname = func.func_name # Python 2 except AttributeError: funcname = func.__name__ # Python 3 warnings.warn( "Plot had %d datapoints > maxpoints = %d; decimated to %d regularly " "spaced points from the histogrammed data with %s()." % (ny, maxpoints, maxpoints, funcname), category=AutoCorrectionWarning, ) return out
[docs] def decimate_smooth(self, a, maxpoints, window="flat"): """Return smoothed data *a* decimated on approximately *maxpoints* points. 1. Produces a smoothed graph using the smoothing window *window*; "flat" is a running average. 2. select points at a step size approximatelt producing maxpoints If *a* contains <= *maxpoints* then *a* is simply returned; otherwise a new array of the same dimensions but with a reduced number of points (close to *maxpoints*) is returned. .. Note:: Assumes that the first column is time (which will *never* be smoothed/averaged), except when the input array *a* is 1D and therefore to be assumed to be data at equidistance timepoints. TODO: - Allow treating the 1st column as data """ ny = a.shape[-1] # assume 1D or 2D array with last dimension varying fastest # reduce size by averaging oover stepsize steps and then just # picking every stepsize data points. (primitive --- can # leave out bits at the end or end up with almost twice of # maxpoints) stepsize = int(ny / maxpoints) if stepsize % 2 == 0: stepsize += 1 # must be odd for the running average/smoothing window out = numpy.empty_like(a) # smoothed out[0, :] = a[0] for i in range(1, a.shape[0]): # process columns because smooth() only handles 1D arrays :-p out[i, :] = numkit.timeseries.smooth(a[i], stepsize, window=window) if ( maxpoints == self.maxpoints_default ): # only warn if user did not set maxpoints warnings.warn( "Plot had %d datapoints > maxpoints = %d; decimated to %d regularly " "spaced points with smoothing (%r) over %d steps." % (ny, maxpoints, ny / stepsize, window, stepsize), category=AutoCorrectionWarning, ) return out[..., ::stepsize]
def __getstate__(self): """custom pickling protocol: http://docs.python.org/library/pickle.html If :attr:`XVG.savedata` is ``False`` then any attributes in :attr:`XVG.__pickle_excluded` are *not* pickled as they are but simply pickled with the default value. """ if self.savedata: d = self.__dict__ else: # do not pickle the big array cache mangleprefix = "_" + self.__class__.__name__ def demangle(k): """_XVG__array --> __array""" if k.startswith(mangleprefix): k = k.replace(mangleprefix, "") return k d = {} for k in self.__dict__: d[k] = self.__pickle_excluded.get(demangle(k), self.__dict__[k]) return d def __setstate__(self, d): # compatibility with older (pre 0.1.13) pickled instances if not "savedata" in d: wmsg = "Reading pre 0.1.13 pickle file: setting savedata=False" warnings.warn(wmsg, category=DeprecationWarning) self.logger.warning(wmsg) d["savedata"] = False # new default self.__dict__.update(d)
[docs] def break_array(a, threshold=numpy.pi, other=None): """Create a array which masks jumps >= threshold. Extra points are inserted between two subsequent values whose absolute difference differs by more than threshold (default is pi). Other can be a secondary array which is also masked according to *a*. Returns (*a_masked*, *other_masked*) (where *other_masked* can be ``None``) """ assert len(a.shape) == 1, "Only 1D arrays supported" if other is not None and a.shape != other.shape: raise ValueError("arrays must be of identical shape") # jump occurs after the index in break breaks = numpy.where(numpy.abs(numpy.diff(a)) >= threshold)[0] # insert a blank after breaks += 1 # is this needed?? -- no, but leave it here as a reminder # f2 = numpy.diff(a, 2) # up = (f2[breaks - 1] >= 0) # >0: up, <0: down # sort into up and down breaks: # breaks_up = breaks[up] # breaks_down = breaks[~up] # new array b including insertions for all the breaks m = len(breaks) b = numpy.empty((len(a) + m)) # calculate new indices for breaks in b, taking previous insertions into account b_breaks = breaks + numpy.arange(m) mask = numpy.zeros_like(b, dtype=bool) mask[b_breaks] = True b[~mask] = a b[mask] = numpy.nan if other is not None: c = numpy.empty_like(b) c[~mask] = other c[mask] = numpy.nan ma_c = numpy.ma.array(c, mask=mask) else: ma_c = None return numpy.ma.array(b, mask=mask), ma_c