Source code for gala.dynamics.util

""" General dynamics utilities. """

# Third-party
import astropy.units as u
import astropy.coordinates as coord
from astropy.utils.misc import isiterable
import numpy as np
from scipy.signal import argrelmax, argrelmin

# This package
from .core import PhaseSpacePosition
from ..util import atleast_2d

__all__ = ['peak_to_peak_period', 'estimate_dt_n_steps', 'combine']


[docs]def peak_to_peak_period(t, f, amplitude_threshold=1E-2): """ Estimate the period of the input time series by measuring the average peak-to-peak time. Parameters ---------- t : array_like Time grid aligned with the input time series. f : array_like A periodic time series. amplitude_threshold : numeric (optional) A tolerance parameter. Fails if the mean amplitude of oscillations isn't larger than this tolerance. Returns ------- period : float The mean peak-to-peak period. """ if hasattr(t, 'unit'): t_unit = t.unit t = t.value else: t_unit = u.dimensionless_unscaled # find peaks max_ix = argrelmax(f, mode='wrap')[0] max_ix = max_ix[(max_ix != 0) & (max_ix != (len(f)-1))] # find troughs min_ix = argrelmin(f, mode='wrap')[0] min_ix = min_ix[(min_ix != 0) & (min_ix != (len(f)-1))] # neglect minor oscillations if abs(np.mean(f[max_ix]) - np.mean(f[min_ix])) < amplitude_threshold: return np.nan # compute mean peak-to-peak if len(max_ix) > 0: T_max = np.mean(t[max_ix[1:]] - t[max_ix[:-1]]) else: T_max = np.nan # now compute mean trough-to-trough if len(min_ix) > 0: T_min = np.mean(t[min_ix[1:]] - t[min_ix[:-1]]) else: T_min = np.nan # then take the mean of these two return np.mean([T_max, T_min]) * t_unit
def _autodetermine_initial_dt(w0, H, dE_threshold=1E-9, **integrate_kwargs): if w0.shape and w0.shape[0] > 1: raise ValueError("Only one set of initial conditions may be passed " "in at a time.") if dE_threshold is None: return 1. dts = np.logspace(-3, 1, 8)[::-1] _base_n_steps = 1000 for dt in dts: n_steps = int(round(_base_n_steps / dt)) orbit = H.integrate_orbit(w0, dt=dt, n_steps=n_steps, **integrate_kwargs) E = orbit.energy() dE = np.abs((E[-1] - E[0]) / E[0]).value if dE < dE_threshold: break return dt
[docs]def estimate_dt_n_steps(w0, hamiltonian, n_periods, n_steps_per_period, dE_threshold=1E-9, func=np.nanmax, **integrate_kwargs): """ Estimate the timestep and number of steps to integrate an orbit for given its initial conditions and a potential object. Parameters ---------- w0 : `~gala.dynamics.PhaseSpacePosition`, array_like Initial conditions. potential : :class:`~gala.potential.PotentialBase` The potential to integrate the orbit in. n_periods : int Number of (max) orbital periods to integrate for. n_steps_per_period : int Number of steps to take per (max) orbital period. dE_threshold : numeric (optional) Maximum fractional energy difference -- used to determine initial timestep. Set to ``None`` to ignore this. func : callable (optional) Determines which period to use. By default, this takes the maximum period using :func:`~numpy.nanmax`. Other options could be :func:`~numpy.nanmin`, :func:`~numpy.nanmean`, :func:`~numpy.nanmedian`. Returns ------- dt : float The timestep. n_steps : int The number of timesteps to integrate for. """ if not isinstance(w0, PhaseSpacePosition): w0 = np.asarray(w0) w0 = PhaseSpacePosition.from_w(w0, units=hamiltonian.units) from ..potential import Hamiltonian hamiltonian = Hamiltonian(hamiltonian) # integrate orbit dt = _autodetermine_initial_dt(w0, hamiltonian, dE_threshold=dE_threshold, **integrate_kwargs) n_steps = int(round(10000 / dt)) orbit = hamiltonian.integrate_orbit(w0, dt=dt, n_steps=n_steps, **integrate_kwargs) # if loop, align circulation with Z and take R period circ = orbit.circulation() if np.any(circ): orbit = orbit.align_circulation_with_z(circulation=circ) cyl = orbit.represent_as(coord.CylindricalRepresentation) # convert to cylindrical coordinates R = cyl.rho.value phi = cyl.phi.value z = cyl.z.value T = np.array([peak_to_peak_period(orbit.t, f).value for f in [R, phi, z]])*orbit.t.unit else: T = np.array([peak_to_peak_period(orbit.t, f).value for f in orbit.pos])*orbit.t.unit # timestep from number of steps per period T = func(T) if np.isnan(T): raise RuntimeError("Failed to find period.") T = T.decompose(hamiltonian.units).value dt = T / float(n_steps_per_period) n_steps = int(round(n_periods * T / dt)) if dt == 0. or dt < 1E-13: raise ValueError("Timestep is zero or very small!") return dt, n_steps
[docs]def combine(objs): """Combine the specified `~gala.dynamics.PhaseSpacePosition` or `~gala.dynamics.Orbit` objects. Parameters ---------- objs : iterable An iterable of either `~gala.dynamics.PhaseSpacePosition` or `~gala.dynamics.Orbit` objects. """ from .orbit import Orbit # have to special-case this because they are iterable if isinstance(objs, PhaseSpacePosition) or isinstance(objs, Orbit): raise ValueError("You must pass a non-empty iterable to combine.") elif not isiterable(objs) or len(objs) < 1: raise ValueError("You must pass a non-empty iterable to combine.") elif len(objs) == 1: # short circuit return objs[0] # We only support these two types to combine: if objs[0].__class__ not in [PhaseSpacePosition, Orbit]: raise TypeError("Objects must be either PhaseSpacePosition or Orbit " "instances.") # Validate objects: # - check type # - check dimensionality # - check frame, potential # - Right now, we only support Cartesian for obj in objs: # Check to see if they are all the same type of object: if obj.__class__ != objs[0].__class__: raise TypeError("All objects must have the same type.") # Make sure they have same dimensionality if obj.ndim != objs[0].ndim: raise ValueError("All objects must have the same ndim.") # Check that all objects have the same reference frame if obj.frame != objs[0].frame: raise ValueError("All objects must have the same frame.") # Check that (for orbits) they all have the same potential if hasattr(obj, 'potential') and obj.potential != objs[0].potential: raise ValueError("All objects must have the same potential.") # For orbits, time arrays must be the same if (hasattr(obj, 't') and obj.t is not None and objs[0].t is not None and not u.allclose(obj.t, objs[0].t, atol=1E-13*objs[0].t.unit)): raise ValueError("All orbits must have the same time array.") if 'cartesian' not in obj.pos.get_name(): raise NotImplementedError("Currently, combine only works for " "Cartesian-represented objects.") # Now we prepare the positions, velocities: if objs[0].__class__ == PhaseSpacePosition: pos = [] vel = [] for i, obj in enumerate(objs): if i == 0: pos_unit = obj.pos.xyz.unit vel_unit = obj.vel.d_xyz.unit pos.append(atleast_2d(obj.pos.xyz.to(pos_unit).value, insert_axis=1)) vel.append(atleast_2d(obj.vel.d_xyz.to(vel_unit).value, insert_axis=1)) pos = np.concatenate(pos, axis=1) * pos_unit vel = np.concatenate(vel, axis=1) * vel_unit return PhaseSpacePosition(pos=pos, vel=vel, frame=objs[0].frame) elif objs[0].__class__ == Orbit: pos = [] vel = [] for i, obj in enumerate(objs): if i == 0: pos_unit = obj.pos.xyz.unit vel_unit = obj.vel.d_xyz.unit p = obj.pos.xyz.to(pos_unit).value v = obj.vel.d_xyz.to(vel_unit).value if p.ndim < 3: p = p.reshape(p.shape + (1,)) v = v.reshape(v.shape + (1,)) pos.append(p) vel.append(v) pos = np.concatenate(pos, axis=2) * pos_unit vel = np.concatenate(vel, axis=2) * vel_unit return Orbit(pos=pos, vel=vel, t=objs[0].t, frame=objs[0].frame, potential=objs[0].potential) else: raise RuntimeError("should never get here...")