Source code for gala.dynamics.util
# coding: utf-8
""" General dynamics utilities. """
from __future__ import division, print_function
__author__ = "adrn <adrn@astro.columbia.edu>"
# Third-party
import astropy.units as u
import astropy.coordinates as coord
import numpy as np
from scipy.signal import argrelmax, argrelmin
# This package
from .core import CartesianPhaseSpacePosition
from ..integrate import LeapfrogIntegrator
__all__ = ['peak_to_peak_period', 'estimate_dt_n_steps']
[docs]def peak_to_peak_period(t, f, amplitude_threshold=1E-2):
"""
Estimate the period of the input time series by measuring the average
peak-to-peak time.
Parameters
----------
t : array_like
Time grid aligned with the input time series.
f : array_like
A periodic time series.
amplitude_threshold : numeric (optional)
A tolerance parameter. Fails if the mean amplitude of oscillations
isn't larger than this tolerance.
Returns
-------
period : float
The mean peak-to-peak period.
"""
if hasattr(t, 'unit'):
t_unit = t.unit
t = t.value
else:
t_unit = u.dimensionless_unscaled
# find peaks
max_ix = argrelmax(f, mode='wrap')[0]
max_ix = max_ix[(max_ix != 0) & (max_ix != (len(f)-1))]
# find troughs
min_ix = argrelmin(f, mode='wrap')[0]
min_ix = min_ix[(min_ix != 0) & (min_ix != (len(f)-1))]
# neglect minor oscillations
if abs(np.mean(f[max_ix]) - np.mean(f[min_ix])) < amplitude_threshold:
return np.nan
# compute mean peak-to-peak
if len(max_ix) > 0:
T_max = np.mean(t[max_ix[1:]] - t[max_ix[:-1]])
else:
T_max = np.nan
# now compute mean trough-to-trough
if len(min_ix) > 0:
T_min = np.mean(t[min_ix[1:]] - t[min_ix[:-1]])
else:
T_min = np.nan
# then take the mean of these two
return np.mean([T_max, T_min]) * t_unit
def _autodetermine_initial_dt(w0, potential, dE_threshold=1E-9, Integrator=LeapfrogIntegrator):
if w0.shape[0] > 1:
raise ValueError("Only one set of initial conditions may be passed in at a time.")
if dE_threshold is None:
return 1.
dts = np.logspace(-3, 1, 8)[::-1]
_base_n_steps = 1000
for dt in dts:
n_steps = int(round(_base_n_steps / dt))
orbit = potential.integrate_orbit(w0, dt=dt, n_steps=n_steps, Integrator=Integrator)
E = orbit.energy()
dE = np.abs((E[-1] - E[0]) / E[0]).value
if dE < dE_threshold:
break
return dt
[docs]def estimate_dt_n_steps(w0, potential, n_periods, n_steps_per_period, dE_threshold=1E-9,
func=np.nanmax):
"""
Estimate the timestep and number of steps to integrate an orbit for
given its initial conditions and a potential object.
Parameters
----------
w0 : `~gala.dynamics.PhaseSpacePosition`, array_like
Initial conditions.
potential : :class:`~gala.potential.PotentialBase`
The potential to integrate the orbit in.
n_periods : int
Number of (max) orbital periods to integrate for.
n_steps_per_period : int
Number of steps to take per (max) orbital period.
dE_threshold : numeric (optional)
Maximum fractional energy difference -- used to determine initial timestep.
Set to ``None`` to ignore this.
func : callable (optional)
Determines which period to use. By default, this takes the maximum period using
:func:`~numpy.nanmax`. Other options could be :func:`~numpy.nanmin`,
:func:`~numpy.nanmean`, :func:`~numpy.nanmedian`.
Returns
-------
dt : float
The timestep.
n_steps : int
The number of timesteps to integrate for.
"""
if not isinstance(w0, CartesianPhaseSpacePosition):
w0 = np.asarray(w0)
w0 = CartesianPhaseSpacePosition.from_w(w0, units=potential.units)
# integrate orbit
dt = _autodetermine_initial_dt(w0, potential, dE_threshold=dE_threshold)
n_steps = int(round(10000 / dt))
orbit = potential.integrate_orbit(w0, dt=dt, n_steps=n_steps)
# if loop, align circulation with Z and take R period
circ = orbit.circulation()
if np.any(circ):
orbit = orbit.align_circulation_with_z(circulation=circ)
cyl,_ = orbit.represent_as(coord.CylindricalRepresentation) # ignore velocity return
# convert to cylindrical coordinates
R = cyl.rho.value
phi = cyl.phi.value
z = cyl.z.value
T = np.array([peak_to_peak_period(orbit.t, f).value for f in [R, phi, z]])*orbit.t.unit
else:
T = np.array([peak_to_peak_period(orbit.t, f).value for f in orbit.pos])*orbit.t.unit
# timestep from number of steps per period
T = func(T)
if np.isnan(T):
raise RuntimeError("Failed to find period.")
T = T.decompose(potential.units).value
dt = T / float(n_steps_per_period)
n_steps = int(round(n_periods * T / dt))
if dt == 0. or dt < 1E-13:
raise ValueError("Timestep is zero or very small!")
return dt, n_steps