Coverage for tcvx21/grillix_post/lineouts/lineout_m.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

48 statements  

1""" 

2Defines an implementation for lineouts based on curves in the poloidal plane, which are typically close to orthogonal 

3to magnetic flux surfaces 

4""" 

5import numpy as np 

6import xarray as xr 

7 

8from scipy.interpolate.fitpack2 import RectBivariateSpline 

9from .csr_linear_interpolation_m import make_matrix_interp 

10from tcvx21.units_m import Quantity 

11from .parametric_spline_m import ParametricSpline 

12 

13 

14def on_grid_interpolator(grid) -> RectBivariateSpline: 

15 """ 

16 Returns an interpolator which returns 1.0 if an arbitrary (R, Z) position is on the grid, and 0.0 if not 

17 """ 

18 on_grid = np.nan_to_num(grid.shape(xr.DataArray(np.ones_like(grid.r_u), dims='points')), nan=0.0) 

19 return RectBivariateSpline(grid.r_s, grid.z_s, on_grid.T, kx=1, ky=1) 

20 

21 

22class Lineout: 

23 """ 

24 A "lineout" which allows you to map from grid points to an arbitrary (R, Z) poloidal line or curve 

25 """ 

26 

27 def __init__(self, r_points: np.ndarray, z_points: np.ndarray, 

28 periodic: bool = False, smoothing: float = 0.0): 

29 """ 

30 Initialises the Lineout object from a specified set of r and z points 

31  

32 If smoothing if > 0.0, then the input points will be smoothed. (Typically smoothing required << 1, recommend 

33 to check the fit) 

34 """ 

35 r_points, z_points = np.array(r_points), np.array(z_points) 

36 

37 assert r_points.ndim == 1 

38 assert r_points.shape == z_points.shape 

39 self.r_source, self.z_source = r_points, z_points 

40 

41 # Default use a third order spline, unless not enough points are given in which case drop the order 

42 order = min(3, r_points.size - 1) 

43 

44 self.curve = ParametricSpline(sample_points=[r_points, z_points], periodic=periodic, smoothing=smoothing, 

45 order=order) 

46 

47 def find_points_on_grid(self, grid, n_points, 

48 test_points: int = 1000, epsilon: float = 1E-6, trim_points: bool = False): 

49 """ 

50 Finds 'n_points' equally spaced (R, Z) points along the lineout 

51 

52 Note that 'curve' must have a single segment on the grid -- this algorithm will fail if there is an off grid 

53 segment between two on-grid segments 

54 

55 n_points * precision test points will be used to check which values of the parameter are on the grid 

56 epsilon shifts the endpoints slightly inwards to ensure that the returned points are actually on the grid 

57 """ 

58 on_grid = on_grid_interpolator(grid) 

59 

60 t_tests = np.linspace(self.curve.t_min, self.curve.t_max, num=test_points, endpoint=True) 

61 

62 test_on_grid = on_grid(*self.curve(t_tests), grid=False) 

63 t_on_grid = t_tests[np.where(test_on_grid > 0.99)] 

64 

65 # Find the endpoints 

66 t_min, t_max = np.min(t_on_grid), np.max(t_on_grid) 

67 

68 if trim_points: 

69 # Trim the lineout to exclude the first 5% and last 5% of the data, since we shouldn't take statistics in 

70 # the boundary conditions (pinned values have extreme skew and kurtosis) 

71 t_min += 0.1 * (t_max-t_min) 

72 t_max -= 0.1 * (t_max-t_min) 

73 

74 # Find the points inside the interval 

75 t_points = np.linspace(t_min + epsilon, t_max - epsilon, num=n_points) 

76 r_points, z_points = self.curve(t_points) 

77 

78 # Make sure that all of the returned points are on the grid. min should be close to 1.0, but allow edge points 

79 assert np.min(on_grid(r_points, z_points, grid=False)) > 0.5, f"get_n_points_on_grid returned >1 off-grid point.\ 

80 Check that the lineout has a single segment on the grid" 

81 

82 self.r_points, self.z_points = r_points, z_points 

83 self.setup_interpolation_matrix(grid) 

84 

85 def setup_interpolation_matrix(self, grid, use_source_points: bool = False): 

86 """ 

87 Adds attributes required for interpolation 

88 """ 

89 if use_source_points: 

90 self.r_points = self.r_source 

91 self.z_points = self.z_source 

92 

93 self.interpolation_matrix = make_matrix_interp(grid.r_u, grid.z_u, self.r_points, self.z_points) 

94 self.grid_size, self.curve_size = grid.size, self.r_points.size 

95 

96 def interpolate_1d(self, input_array: np.ndarray): 

97 """ 

98 Applies the interpolation matrix to an array with dimension 'points' 

99 """ 

100 return self.interpolation_matrix * input_array 

101 

102 

103 def interpolate(self, input_array: xr.DataArray): 

104 """ 

105 Uses a dask-parallelized algorithm to apply the csr-matrix interpolation over the input array 

106 

107 See http://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html for hints on writing 

108 ufuncs for xarrays 

109 """ 

110 

111 interpolated = xr.apply_ufunc( 

112 # First pass the function 

113 self.interpolate_1d, 

114 # Next, pass the input array, and chunk it in such a way that dask can neatly parallelize it 

115 input_array, 

116 # Provide a list of the 'core' dimensions, which are the ones that aren't looped over 

117 input_core_dims=[["points"],], 

118 # Provide a list of the output dimensions (which replaces input_core_dims) 

119 output_core_dims=[["interp_points"]], 

120 # Set which dimensions are allowed to change size (doesn't seem to have any effect) 

121 # exclude_dims=set(("points",)), 

122 # Loop over non-core dims 

123 vectorize=True, 

124 # Pass keywords to the dask core, to define the size of the returned array 

125 # Switch on dask parallelism. Works best if input_array is generated using the dask-based methods 

126 dask='parallelized', 

127 # Declare the size of the output_core_dims 

128 dask_gufunc_kwargs={"output_sizes": {"interp_points": self.curve_size}}, 

129 # Declare the type of the output 

130 output_dtypes=[np.float64] 

131 ) 

132 

133 interpolated.attrs = input_array.attrs 

134 

135 return interpolated 

136 

137 

138 def poloidal_arc_length(self, norm) -> xr.DataArray: 

139 """ 

140 Calculates the poloidal arc length of a list of (R, Z) points by using simple finite difference 

141 

142 The result is the same length as the input array, due to a zero-insertion for the first point 

143 """ 

144 return xr.DataArray( 

145 np.cumsum( 

146 np.insert( 

147 np.sqrt(np.diff(self.r_points) ** 2 + np.diff(self.z_points) ** 2), 

148 0, 0.0) 

149 )).assign_attrs(norm=('R0' if norm is None else norm['R0']), name='poloidal arc length') 

150 

151 def omp_mapped_distance(self, omp_map) -> Quantity: 

152 """ 

153 Calculates the OMP-mapped radial distance 

154 """ 

155 values = omp_map(self.r_points, self.z_points) 

156 return values.values * values.norm