Coverage for tcvx21/grillix_post/lineouts/lineout_m.py: 96%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Defines an implementation for lineouts based on curves in the poloidal plane, which are typically close to orthogonal
3to magnetic flux surfaces
4"""
5import numpy as np
6import xarray as xr
8from scipy.interpolate.fitpack2 import RectBivariateSpline
9from .csr_linear_interpolation_m import make_matrix_interp
10from tcvx21.units_m import Quantity
11from .parametric_spline_m import ParametricSpline
14def on_grid_interpolator(grid) -> RectBivariateSpline:
15 """
16 Returns an interpolator which returns 1.0 if an arbitrary (R, Z) position is on the grid, and 0.0 if not
17 """
18 on_grid = np.nan_to_num(grid.shape(xr.DataArray(np.ones_like(grid.r_u), dims='points')), nan=0.0)
19 return RectBivariateSpline(grid.r_s, grid.z_s, on_grid.T, kx=1, ky=1)
22class Lineout:
23 """
24 A "lineout" which allows you to map from grid points to an arbitrary (R, Z) poloidal line or curve
25 """
27 def __init__(self, r_points: np.ndarray, z_points: np.ndarray,
28 periodic: bool = False, smoothing: float = 0.0):
29 """
30 Initialises the Lineout object from a specified set of r and z points
32 If smoothing if > 0.0, then the input points will be smoothed. (Typically smoothing required << 1, recommend
33 to check the fit)
34 """
35 r_points, z_points = np.array(r_points), np.array(z_points)
37 assert r_points.ndim == 1
38 assert r_points.shape == z_points.shape
39 self.r_source, self.z_source = r_points, z_points
41 # Default use a third order spline, unless not enough points are given in which case drop the order
42 order = min(3, r_points.size - 1)
44 self.curve = ParametricSpline(sample_points=[r_points, z_points], periodic=periodic, smoothing=smoothing,
45 order=order)
47 def find_points_on_grid(self, grid, n_points,
48 test_points: int = 1000, epsilon: float = 1E-6, trim_points: bool = False):
49 """
50 Finds 'n_points' equally spaced (R, Z) points along the lineout
52 Note that 'curve' must have a single segment on the grid -- this algorithm will fail if there is an off grid
53 segment between two on-grid segments
55 n_points * precision test points will be used to check which values of the parameter are on the grid
56 epsilon shifts the endpoints slightly inwards to ensure that the returned points are actually on the grid
57 """
58 on_grid = on_grid_interpolator(grid)
60 t_tests = np.linspace(self.curve.t_min, self.curve.t_max, num=test_points, endpoint=True)
62 test_on_grid = on_grid(*self.curve(t_tests), grid=False)
63 t_on_grid = t_tests[np.where(test_on_grid > 0.99)]
65 # Find the endpoints
66 t_min, t_max = np.min(t_on_grid), np.max(t_on_grid)
68 if trim_points:
69 # Trim the lineout to exclude the first 5% and last 5% of the data, since we shouldn't take statistics in
70 # the boundary conditions (pinned values have extreme skew and kurtosis)
71 t_min += 0.1 * (t_max-t_min)
72 t_max -= 0.1 * (t_max-t_min)
74 # Find the points inside the interval
75 t_points = np.linspace(t_min + epsilon, t_max - epsilon, num=n_points)
76 r_points, z_points = self.curve(t_points)
78 # Make sure that all of the returned points are on the grid. min should be close to 1.0, but allow edge points
79 assert np.min(on_grid(r_points, z_points, grid=False)) > 0.5, f"get_n_points_on_grid returned >1 off-grid point.\
80 Check that the lineout has a single segment on the grid"
82 self.r_points, self.z_points = r_points, z_points
83 self.setup_interpolation_matrix(grid)
85 def setup_interpolation_matrix(self, grid, use_source_points: bool = False):
86 """
87 Adds attributes required for interpolation
88 """
89 if use_source_points:
90 self.r_points = self.r_source
91 self.z_points = self.z_source
93 self.interpolation_matrix = make_matrix_interp(grid.r_u, grid.z_u, self.r_points, self.z_points)
94 self.grid_size, self.curve_size = grid.size, self.r_points.size
96 def interpolate_1d(self, input_array: np.ndarray):
97 """
98 Applies the interpolation matrix to an array with dimension 'points'
99 """
100 return self.interpolation_matrix * input_array
103 def interpolate(self, input_array: xr.DataArray):
104 """
105 Uses a dask-parallelized algorithm to apply the csr-matrix interpolation over the input array
107 See http://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html for hints on writing
108 ufuncs for xarrays
109 """
111 interpolated = xr.apply_ufunc(
112 # First pass the function
113 self.interpolate_1d,
114 # Next, pass the input array, and chunk it in such a way that dask can neatly parallelize it
115 input_array,
116 # Provide a list of the 'core' dimensions, which are the ones that aren't looped over
117 input_core_dims=[["points"],],
118 # Provide a list of the output dimensions (which replaces input_core_dims)
119 output_core_dims=[["interp_points"]],
120 # Set which dimensions are allowed to change size (doesn't seem to have any effect)
121 # exclude_dims=set(("points",)),
122 # Loop over non-core dims
123 vectorize=True,
124 # Pass keywords to the dask core, to define the size of the returned array
125 # Switch on dask parallelism. Works best if input_array is generated using the dask-based methods
126 dask='parallelized',
127 # Declare the size of the output_core_dims
128 dask_gufunc_kwargs={"output_sizes": {"interp_points": self.curve_size}},
129 # Declare the type of the output
130 output_dtypes=[np.float64]
131 )
133 interpolated.attrs = input_array.attrs
135 return interpolated
138 def poloidal_arc_length(self, norm) -> xr.DataArray:
139 """
140 Calculates the poloidal arc length of a list of (R, Z) points by using simple finite difference
142 The result is the same length as the input array, due to a zero-insertion for the first point
143 """
144 return xr.DataArray(
145 np.cumsum(
146 np.insert(
147 np.sqrt(np.diff(self.r_points) ** 2 + np.diff(self.z_points) ** 2),
148 0, 0.0)
149 )).assign_attrs(norm=('R0' if norm is None else norm['R0']), name='poloidal arc length')
151 def omp_mapped_distance(self, omp_map) -> Quantity:
152 """
153 Calculates the OMP-mapped radial distance
154 """
155 values = omp_map(self.r_points, self.z_points)
156 return values.values * values.norm