Source code for psydac.fem.tensor

# coding: utf-8

"""
We assume here that a tensor space is the product of fem spaces whom basis are
of compact support

"""
from psydac.ddm.mpi import mpi as MPI
    
import numpy as np
import itertools
import h5py
import os

from types import MappingProxyType

from psydac.linalg.stencil   import StencilVectorSpace
from psydac.linalg.kron      import kronecker_solve
from psydac.fem.basic        import FemSpace, FemField
from psydac.fem.splines      import SplineSpace
from psydac.fem.grid         import FemAssemblyGrid
from psydac.fem.partitioning import create_cart, partition_coefficients
from psydac.ddm.cart         import DomainDecomposition, CartDecomposition

from psydac.core.bsplines  import (find_span,
                                   basis_funs,
                                   basis_funs_1st_der,
                                   basis_ders_on_quad_grid,
                                   elements_spans,
                                   cell_index,
                                   basis_ders_on_irregular_grid)

from psydac.core.field_evaluation_kernels import (eval_fields_1d_no_weights,
                                                  eval_fields_1d_irregular_no_weights,
                                                  eval_fields_1d_weighted,
                                                  eval_fields_1d_irregular_weighted,
                                                  eval_fields_2d_no_weights,
                                                  eval_fields_2d_irregular_no_weights,
                                                  eval_fields_2d_weighted,
                                                  eval_fields_2d_irregular_weighted,
                                                  eval_fields_3d_no_weights,
                                                  eval_fields_3d_irregular_no_weights,
                                                  eval_fields_3d_weighted,
                                                  eval_fields_3d_irregular_weighted)

__all__ = ('TensorFemSpace',)

#===============================================================================
[docs] class TensorFemSpace(FemSpace): """ Tensor-product Finite Element space V. Parameters ---------- domain_decomposition : psydac.ddm.cart.DomainDecomposition *spaces : psydac.fem.splines.SplineSpace 1D finite element spaces. coeff_space : psydac.linalg.stencil.StencilVectorSpace or None The vector space to which the coefficients belong (optional). cart : psydac.ddm.CartDecomposition or None Object that contains all information about the Cartesian decomposition of a tensor-product grid of coefficients. dtype : {float, complex} Data type of the coefficients. Notes ----- For now we assume that this tensor-product space can ONLY be constructed from 1D spline spaces. """ def __init__(self, domain_decomposition, *spaces, coeff_space=None, cart=None, dtype=float): assert isinstance(domain_decomposition, DomainDecomposition) assert all(isinstance(s, SplineSpace) for s in spaces) assert dtype in (float, complex) # TODO [YG 10.04.2024]: check if dtype test is too restrictive # Handle optional arguments if cart and coeff_space: raise ValueError("Cannot provide both 'coeff_space' and 'cart' to constructor") elif cart is not None: assert isinstance(cart, CartDecomposition) coeff_space = StencilVectorSpace(cart, dtype=dtype) elif coeff_space is not None: assert isinstance(coeff_space, StencilVectorSpace) cart = coeff_space.cart else: cart = create_cart([domain_decomposition], [spaces])[0] coeff_space = StencilVectorSpace(cart, dtype=dtype) # Store some info self._domain_decomposition = domain_decomposition self._spaces = spaces self._dtype = dtype self._coeff_space = coeff_space self._symbolic_space = None self._refined_space = {} self._interfaces = {} self._interfaces_readonly = MappingProxyType(self._interfaces) # If process does not own space, stop here if coeff_space.parallel and cart.is_comm_null: return # Determine portion of logical domain local to process. # This corresponds to the indices of the first and last elements # owned by the current process, along each direction. self._element_starts = self._coeff_space.cart.domain_decomposition.starts self._element_ends = self._coeff_space.cart.domain_decomposition.ends # Compute limits of eta_0, eta_1, eta_2, etc... in subdomain local to process self._eta_limits = tuple((space.breaks[s], space.breaks[e+1]) for s, e, space in zip(self._element_starts, self._element_ends, self._spaces)) # Local domains for every process self._global_element_starts = domain_decomposition.global_element_starts self._global_element_ends = domain_decomposition.global_element_ends # Extended 1D assembly grids (local to process) along each direction self._assembly_grids = [{} for _ in range(self.ldim)] # Flag: object NOT YET prepared for interpolation self._interpolation_ready = False # Store information about nested grids self.set_refined_space(self.ncells, self) #-------------------------------------------------------------------------- # Abstract interface: read-only attributes #-------------------------------------------------------------------------- # @property # def nquads( self ): # assert self._nquads, "nquads has to be set with self._nquads = nquads" # return self._nquads # @nquads.setter # def nquads(self, value): # self._nquads = value # @property # def quad_grids( self ): # assert self._nquads, "nquads has to be set with self._nquads = nquads" # return tuple({q: gag} for q, gag in zip(self.nquads, self.get_assembly_grids(*self.nquads))) @property def ldim(self): """ Parametric dimension. """ return sum([V.ldim for V in self.spaces]) @property def periodic(self): """ Tuple of booleans: along each logical dimension, say if domain is periodic. :rtype: tuple[bool] """ # [YG, 27.03.2025]: according to the abstract interface of FemSpace, # this property should return a tuple of `ldim` booleans. However, the # spaces in self.spaces seem to be returning a single scalar value. return tuple(V.periodic for V in self.spaces) @property def domain_decomposition(self): return self._domain_decomposition @property def mapping(self): # [YG, 28.03.2025]: not clear why there should be no mapping here... # Clearly this property is never used in Psydac. return None @property def coeff_space(self): """ Vector space of the coefficients (mapping invariant). :rtype: psydac.linalg.stencil.StencilVectorSpace """ return self._coeff_space @property def symbolic_space( self ): return self._symbolic_space @property def interfaces( self ): return self._interfaces_readonly @symbolic_space.setter def symbolic_space( self, symbolic_space ): #assert isinstance(symbolic_space, BasicFunctionSpace) self._symbolic_space = symbolic_space @property def patch_spaces(self): return (self,) @property def component_spaces(self): return (self,) @property def axis_spaces(self): return self._spaces @property def is_multipatch(self): return False @property def is_vector_valued(self): return False #-------------------------------------------------------------------------- # Abstract interface: evaluation methods #--------------------------------------------------------------------------
[docs] def eval_field( self, field, *eta, weights=None): assert isinstance( field, FemField ) assert field.space is self assert len( eta ) == self.ldim if weights: assert weights.space == field.coeffs.space bases = [] index = [] # Necessary if vector coeffs is distributed across processes if not field.coeffs.ghost_regions_in_sync: field.coeffs.update_ghost_regions() # Check if `x` is iterable and loop over elements if isinstance(eta[0], (list, np.ndarray)) and np.ndim(eta[0]) > 0: for dim in range(1, self.ldim): assert len(eta[0]) == len(eta[dim]) res_list = [] for i in range(len(eta[0])): x = [eta[j][i] for j in range(self.ldim)] res_list.append(self.eval_field(field, *x, weights=weights)) return np.array(res_list) for (x, xlim, space) in zip( eta, self.eta_lims, self.spaces ): knots = space.knots degree = space.degree span = find_span( knots, degree, x ) #-------------------------------------------------# # Fix span for boundaries between subdomains # #-------------------------------------------------# # TODO: Use local knot sequence instead of global # # one to get correct span in all situations # #-------------------------------------------------# if x == xlim[1] and x != knots[-1-degree]: span -= 1 #-------------------------------------------------# basis = basis_funs( knots, degree, x, span) # If needed, rescale B-splines to get M-splines if space.basis == 'M': basis *= space.scaling_array[span-degree : span+1] # Determine local span wrap_x = space.periodic and x > xlim[1] loc_span = span - space.nbasis if wrap_x else span bases.append( basis ) index.append( slice( loc_span-degree, loc_span+1 ) ) # Get contiguous copy of the spline coefficients required for evaluation index = tuple( index ) coeffs = field.coeffs[index].copy() if weights: coeffs *= weights[index] # Evaluation of multi-dimensional spline # TODO: optimize # Option 1: contract indices one by one and store intermediate results # - Pros: small number of Python iterations = ldim # - Cons: we create ldim-1 temporary objects of decreasing size # res = coeffs for basis in bases[::-1]: res = np.dot( res, basis ) # # Option 2: cycle over each element of 'coeffs' (touched only once) # # - Pros: no temporary objects are created # # - Cons: large number of Python iterations = number of elements in 'coeffs' # # # res = 0.0 # for idx,c in np.ndenumerate( coeffs ): # ndbasis = np.prod( [b[i] for i,b in zip( idx, bases )] ) # res += c * ndbasis return res
# ...
[docs] def preprocess_regular_tensor_grid(self, grid, der=0, overlap=0): """Returns all the quantities needed to evaluate fields on a regular tensor-grid. Parameters ---------- grid : List of ndarray List of 2D arrays representing each direction of the grid. Each of these arrays should have shape (ne_xi, nv_xi) where ne_xi is the number of cells in the domain in the direction xi and nv_xi is the number of evaluation points in the same direction. der : int, default=0 Number of derivatives of the basis functions to pre-compute. overlap : int How much to overlap. Only used in the distributed context. Returns ------- degree : tuple of int Degree in each direction global_basis : List of ndarray List of 4D arrays, one per direction, containing the values of the p+1 non-vanishing basis functions (and their derivatives) at each grid point. The array for direction xi has shape (ne_xi, der + 1, p+1, nv_xi). global_spans : List of ndarray List of 1D arrays, one per direction, containing the index of the last non-vanishing basis function in each cell. The array for direction xi has shape (ne_xi,). local_shape : List of tuple Shape of what is local to this instance. """ # Check the grid assert len(grid) == self.ldim # Get the local domain v = self.coeff_space starts, ends = self.local_domain # Add the overlap if we are in parallel if v.parallel: starts = tuple(s - overlap if s!=0 else s for s in starts) ends = tuple(e + overlap for e in ends) # Compute the basis functions and spans. local_shape = [] global_basis = [] global_spans = [] for i in range(self.ldim): # We only care about the local grid grid_local = grid[i][slice(starts[i], ends[i] + 1)] # Compute basis functions and spans global_basis_i = basis_ders_on_quad_grid(self.knots[i], self.degree[i], grid_local, der, self.spaces[i].basis, offset=starts[i]) global_spans_i = elements_spans(self.knots[i], self.degree[i])[slice(starts[i], ends[i] + 1)] - v.starts[i] + v.shifts[i] * v.pads[i] local_shape.append(grid_local.shape) global_basis.append(global_basis_i) global_spans.append(global_spans_i) return self.degree, global_basis, global_spans, local_shape
#...
[docs] def preprocess_irregular_tensor_grid(self, grid, der=0, overlap=0): """Returns all the quantities needed to evaluate fields on a regular tensor-grid. Parameters ---------- grid : List of ndarray List of 1D arrays representing each direction of the grid. der : int, default=0 Number of derivatives of the basis functions to pre-compute. overlap : int How much to overlap. Only used in the distributed context. Returns ------- pads : tuple of int Padding in each direction degree : tuple of int Degree in each direction global_basis : List of ndarray List of 4D arrays, one per direction, containing the values of the p+1 non-vanishing basis functions (and their derivatives) at each grid point. The array for direction xi has shape (n_xi, p+1, der + 1). global_spans : List of ndarray List of 1D arrays, one per direction, containing the index of the last non-vanishing basis function in each cell. The array for direction xi has shape (n_xi,). cell_indexes : list of ndarray List of 1D arrays, one per direction, containing the index of the cell in which the corresponding point in grid is. local_shape : List of tuple Shape of what is local to this instance. """ # Check the grid assert len(grid) == self.ldim # Get the local domain v = self.coeff_space starts, ends = self.local_domain # Add the overlap if we are in parallel if v.parallel: starts = tuple(s - overlap if s!=0 else s for s in starts) ends = tuple(e + overlap for e in ends) # Compute the basis functions and spans and cell indexes. global_basis = [] global_spans = [] cell_indexes = [] local_shape = [] for i in range(self.ldim): # Check the that the grid is sorted. grid_i = grid[i] assert all(grid_i[j] <= grid_i[j + 1] for j in range(len(grid_i) - 1)) # Get the cell indexes cell_index_i = cell_index(self.breaks[i], grid_i) min_idx = np.searchsorted(cell_index_i, starts[i], side='left') max_idx = np.searchsorted(cell_index_i, ends[i], side='right') # We only care about the local cells. cell_index_i = cell_index_i[min_idx:max_idx] grid_local_i = grid_i[min_idx:max_idx] # basis functions and spans global_basis_i = basis_ders_on_irregular_grid(self.knots[i], self.degree[i], grid_local_i, cell_index_i, der, self.spaces[i].basis) global_spans_i = elements_spans(self.knots[i], self.degree[i])[slice(starts[i], ends[i] + 1)] - v.starts[i] + v.shifts[i] * v.pads[i] local_shape.append(len(grid_local_i)) global_basis.append(global_basis_i) global_spans.append(global_spans_i) # starts[i] is cell 0 of the local domain cell_indexes.append(cell_index_i - starts[i]) return self.degree, global_basis, global_spans, cell_indexes, local_shape
# ...
[docs] def eval_fields(self, grid, *fields, weights=None, npts_per_cell=None, overlap=0): """Evaluate one or several fields at the given location(s) grid. Parameters ---------- grid : List of ndarray Grid on which to evaluate the fields *fields : tuple of psydac.fem.basic.FemField Fields to evaluate weights : psydac.fem.basic.FemField or None, optional Weights field. npts_per_cell: int or tuple of int or None, optional number of evaluation points in each cell. If an integer is given, then assume that it is the same in every direction. overlap : int How much to overlap. Only used in the distributed context. Returns ------- List of ndarray of floats List of the evaluated fields. """ assert all(f.space is self for f in fields) for f in fields: # Necessary if vector coeffs is distributed across processes if not f.coeffs.ghost_regions_in_sync: f.coeffs.update_ghost_regions() if weights is not None: assert weights.space is self assert all(f.coeffs.space is weights.coeffs.space for f in fields) if not weights.coeffs.ghost_regions_in_sync: weights.coeffs.update_ghost_regions() assert len(grid) == self.ldim grid = [np.asarray(grid[i]) for i in range(self.ldim)] assert all(grid[i].ndim == grid[i + 1].ndim for i in range(self.ldim - 1)) # -------------------------- # Case 1. Scalar coordinates if (grid[0].size == 1) or grid[0].ndim == 0: if weights is not None: return [self.eval_field(f, *grid, weights=weights.coeffs) for f in fields] else: return [self.eval_field(f, *grid) for f in fields] # Case 2. 1D array of coordinates and no npts_per_cell is given # -> grid is tensor-product, but npts_per_cell is not the same in each cell elif grid[0].ndim == 1 and npts_per_cell is None: out_fields = self.eval_fields_irregular_tensor_grid(grid, *fields, weights=weights, overlap=overlap) return [np.ascontiguousarray(out_fields[..., i]) for i in range(len(fields))] # Case 3. 1D arrays of coordinates and npts_per_cell is a tuple or an integer # -> grid is tensor-product, and each cell has the same number of evaluation points elif grid[0].ndim == 1 and npts_per_cell is not None: if isinstance(npts_per_cell, int): npts_per_cell = (npts_per_cell,) * self.ldim for i in range(self.ldim): ncells_i = len(self.breaks[i]) - 1 grid[i] = np.reshape(grid[i], (ncells_i, npts_per_cell[i])) out_fields = self.eval_fields_regular_tensor_grid(grid, *fields, weights=weights, overlap=overlap) # return a list return [np.ascontiguousarray(out_fields[..., i]) for i in range(len(fields))] # Case 4. (self.ldim)D arrays of coordinates and no npts_per_cell # -> unstructured grid elif grid[0].ndim == self.ldim and npts_per_cell is None: raise NotImplementedError("Unstructured grids are not supported yet.") # Case 5. Nonsensical input else: raise ValueError("This combination of argument isn't understood. The 4 cases understood are :\n" "Case 1. Scalar coordinates\n" "Case 2. 1D array of coordinates and no npts_per_cell is given\n" "Case 3. 1D arrays of coordinates and npts_per_cell is a tuple or an integer\n" "Case 4. {0}D arrays of coordinates and no npts_per_cell".format(self.ldim))
# ...
[docs] def eval_fields_regular_tensor_grid(self, grid, *fields, weights=None, overlap=0): """Evaluate fields on a regular tensor grid Parameters ---------- grid : List of ndarray List of 2D arrays representing each direction of the grid. Each of these arrays should have shape (ne_xi, nv_xi) where ne is the number of cells in the domain in the direction xi and nv_xi is the number of evaluation points in the same direction. *fields : tuple of psydac.fem.basic.FemField Fields to evaluate on `grid`. weights : psydac.fem.basic.FemField or None, optional Weights to apply to our fields. overlap : int How much to overlap. Only used in the distributed context. Returns ------- List of ndarray of float Values of the fields on the regular tensor grid """ degree, global_basis, global_spans, local_shape = self.preprocess_regular_tensor_grid(grid, der=0, overlap=overlap) ncells = [local_shape[i][0] for i in range(self.ldim)] n_eval_points = [local_shape[i][1] for i in range(self.ldim)] out_fields = np.zeros((*(tuple(ncells[i] * n_eval_points[i] for i in range(self.ldim))), len(fields)), dtype=self.dtype) global_arr_coeffs = np.zeros(shape=(*fields[0].coeffs._data.shape, len(fields)), dtype=self.dtype) for i in range(len(fields)): global_arr_coeffs[..., i] = fields[i].coeffs._data if weights is None: args = (*ncells, *degree, *n_eval_points, *global_basis, *global_spans, global_arr_coeffs, out_fields) if self.ldim == 1: eval_fields_1d_no_weights(*args) elif self.ldim == 2: eval_fields_2d_no_weights(*args) elif self.ldim == 3: eval_fields_3d_no_weights(*args) else: raise NotImplementedError(f"eval_fields_{self.ldim}d_no_weights not implemented") else: global_weight_coeffs = weights.coeffs._data args = (*ncells, *degree, *n_eval_points, *global_basis, *global_spans, global_arr_coeffs, global_weight_coeffs, out_fields) if self.ldim == 1: eval_fields_1d_weighted(*args) elif self.ldim == 2: eval_fields_2d_weighted(*args) elif self.ldim == 3: eval_fields_3d_weighted(*args) else: raise NotImplementedError(f"eval_fields_{self.ldim}d_weighted not implemented") return out_fields
# ...
[docs] def eval_fields_irregular_tensor_grid(self, grid, *fields, weights=None, overlap=0): """Evaluate fields on a regular tensor grid Parameters ---------- grid : List of ndarray List of 2D arrays representing each direction of the grid. Each of these arrays should have shape (ne_xi, nv_xi) where ne is the number of cells in the domain in the direction xi and nv_xi is the number of evaluation points in the same direction. *fields : tuple of psydac.fem.basic.FemField Fields to evaluate on `grid`. weights : psydac.fem.basic.FemField or None, optional Weights to apply to our fields. overlap : int How much to overlap. Only used in the distributed context. Returns ------- List of ndarray of float Values of the fields on the regular tensor grid """ degree, global_basis, global_spans, cell_indexes, local_shape = \ self.preprocess_irregular_tensor_grid(grid, overlap=overlap) out_fields = np.zeros(tuple(local_shape) + (len(fields),), dtype=self.dtype) global_arr_coeffs = np.zeros(shape=(*fields[0].coeffs._data.shape, len(fields)), dtype=self.dtype) npoints = local_shape for i in range(len(fields)): global_arr_coeffs[..., i] = fields[i].coeffs._data if weights is None: args = (*npoints, *degree, *cell_indexes, *global_basis, *global_spans, global_arr_coeffs, out_fields) if self.ldim == 1: eval_fields_1d_irregular_no_weights(*args) elif self.ldim == 2: eval_fields_2d_irregular_no_weights(*args) elif self.ldim == 3: eval_fields_3d_irregular_no_weights(*args) else: raise NotImplementedError(f"eval_fields_{self.ldim}d_irregular_no_weights not implemented") else: global_weight_coeffs = weights.coeffs._data args = (*npoints, *degree, *cell_indexes, *global_basis, *global_spans, global_arr_coeffs, global_weight_coeffs, out_fields) if self.ldim == 1: eval_fields_1d_irregular_weighted(*args) elif self.ldim == 2: eval_fields_2d_irregular_weighted(*args) elif self.ldim == 3: eval_fields_3d_irregular_weighted(*args) else: raise NotImplementedError(f"eval_fields_{self.ldim}d_irregular_weighted not implemented") return out_fields
# ...
[docs] def eval_field_gradient(self, field, *eta, weights=None): assert isinstance(field, FemField) assert field.space is self assert len(eta) == self.ldim bases_0 = [] bases_1 = [] index = [] # Check if `x` is iterable and loop over elements if isinstance(eta[0], (list, np.ndarray)) and np.ndim(eta[0]) > 0: for dim in range(1, self.ldim): assert len(eta[0]) == len(eta[dim]) res_list = [] for i in range(len(eta[0])): x = [eta[j][i] for j in range(self.ldim)] res_list.append(self.eval_field_gradient(field, *x, weights=weights)) return np.array(res_list) for (x, xlim, space) in zip( eta, self.eta_lims, self.spaces ): knots = space.knots degree = space.degree span = find_span( knots, degree, x ) #-------------------------------------------------# # Fix span for boundaries between subdomains # #-------------------------------------------------# # TODO: Use local knot sequence instead of global # # one to get correct span in all situations # #-------------------------------------------------# if x == xlim[1] and x != knots[-1-degree]: span -= 1 #-------------------------------------------------# basis_0 = basis_funs(knots, degree, x, span) basis_1 = basis_funs_1st_der(knots, degree, x, span) # If needed, rescale B-splines to get M-splines if space.basis == 'M': scaling = space.scaling_array[span-degree : span+1] basis_0 *= scaling basis_1 *= scaling # Determine local span wrap_x = space.periodic and x > xlim[1] loc_span = span - space.nbasis if wrap_x else span bases_0.append( basis_0 ) bases_1.append( basis_1 ) index.append( slice( loc_span-degree, loc_span+1 ) ) # Get contiguous copy of the spline coefficients required for evaluation index = tuple( index ) coeffs = field.coeffs[index].copy() if weights: coeffs *= weights[index] # Evaluate each component of the gradient using algorithm described in "Option 1" above grad = [] for d in range( self.ldim ): bases = [(bases_1[d] if i==d else bases_0[i]) for i in range( self.ldim )] res = coeffs for basis in bases[::-1]: res = np.dot( res, basis ) grad.append( res ) return grad
# ...
[docs] def integral(self, f, *, nquads=None): assert hasattr(f, '__call__') if nquads is None: nquads = [p + 1 for p in self.degree] elif isinstance(nquads, int): nquads = [nquads] * self.ldim else: nquads = list(nquads) assert all(isinstance(nq, int) for nq in nquads) assert all(nq >= 1 for nq in nquads) # Extract and store quadrature data assembly_grids = self.get_assembly_grids(*nquads) nq = [g.num_quad_pts for g in assembly_grids] points = [g.points for g in assembly_grids] weights = [g.weights for g in assembly_grids] # Get local element range sk = [g.local_element_start for g in assembly_grids] ek = [g.local_element_end for g in assembly_grids] # Iterator over multi-index k (equivalent to nested loops over each dimension) multi_range = lambda starts, ends: \ itertools.product(*[range(s, e+1) for s, e in zip(starts, ends)]) # Shortcut: Numpy product of all elements in a list np_prod = np.prod # Perform Gaussian quadrature in multiple dimensions c = 0.0 for k in multi_range(sk, ek): x = [ points_i[k_i, :] for points_i, k_i in zip( points, k)] w = [weights_i[k_i, :] for weights_i, k_i in zip(weights, k)] for q in np.ndindex(*nq): y = [x_i[q_i] for x_i, q_i in zip(x, q)] v = [w_i[q_i] for w_i, q_i in zip(w, q)] c += f(*y) * np_prod(v) # All reduce (MPI_SUM) if self.coeff_space.parallel: mpi_comm = self.coeff_space.cart.comm c = mpi_comm.allreduce(c) # convert to native python type if numpy to avoid errors with sympify if isinstance(c, np.generic): c = c.item() return c
#-------------------------------------------------------------------------- # Other properties and methods #-------------------------------------------------------------------------- @property def dtype(self): return self._dtype #TODO: return tuple instead of product? @property def nbasis(self): dims = [V.nbasis for V in self.spaces] dim = 1 for d in dims: dim *= d return dim @property def knots(self): return [V.knots for V in self.spaces] @property def breaks(self): return [V.breaks for V in self.spaces] @property def degree(self): return [V.degree for V in self.spaces] @property def multiplicity(self): return [V.multiplicity for V in self.spaces] @property def pads(self): return self.coeff_space.pads @property def ncells(self): return [V.ncells for V in self.spaces] @property def spaces(self): return self._spaces
[docs] def get_assembly_grids(self, *nquads): """ Return a tuple of `FemAssemblyGrid` objects (one for each direction). These objects are local to the process, and contain all 1D information which is necessary for the correct assembly of the l.h.s. matrix and r.h.s. vector in a finite element method. This information includes the coordinates and weights of all quadrature points, as well as the values of the non-zero basis functions, and their derivatives, at such points. The computed `FemAssemblyGrid` objects are stored in a dictionary in `self` with `nquads` as key, and are reused if a match is found. Parameters ---------- *nquads : int Number of quadrature points per cell, along each direction. Returns ------- tuple of FemAssemblyGrid The 1D assembly grids along each direction. """ assert len(nquads) == self.ldim assert all(isinstance(nq, int) for nq in nquads) assert all(nq >= 1 for nq in nquads) assembly_grids = [None] * len(nquads) for i, nq in enumerate(nquads): # Get a reference to the local dictionary of FemAssemblyGrid along direction i assembly_grids_dict_i = self._assembly_grids[i] # If there is no FemAssemblyGrid for the required number of quadrature points, # create a new FemAssemblyGrid and store it in the local dictionary. if nq not in assembly_grids_dict_i: V = self.spaces[i] s = int(self._element_starts[i]) e = int(self._element_ends [i]) assembly_grids_dict_i[nq] = FemAssemblyGrid(V, s, e, nderiv=V.degree, nquads=nq) # Store the required FemAssemblyGrid in the list assembly_grids[i] = assembly_grids_dict_i[nq] # Return a tuple with the FemAssemblyGrid objects return tuple(assembly_grids)
@property def local_domain(self): """ Logical domain local to the process, assuming the global domain is decomposed across processes without any overlapping. This information is fundamental for avoiding double-counting when computing integrals over the global domain. Returns ------- element_starts : tuple of int Start element index along each direction. element_ends : tuple of int End element index along each direction. """ return self._element_starts, self._element_ends @property def global_element_starts(self): return self._global_element_starts @property def global_element_ends(self): return self._global_element_ends @property def eta_lims(self): """ Eta limits of domain local to the process (for field evaluation). Returns ------- eta_limits: tuple of (2-tuple of float) Along each dimension i, limits are given as (eta^i_{min}, eta^i_{max}). """ return self._eta_limits # ...
[docs] def init_interpolation(self): for V in self.spaces: # TODO: check if OK to access private attribute... if not V._interpolation_ready: V.init_interpolation(dtype=self.dtype)
# ...
[docs] def init_histopolation(self): for V in self.spaces: # TODO: check if OK to access private attribute... if not V._histopolation_ready: V.init_histopolation(dtype=self.dtype)
# ...
[docs] def compute_interpolant(self, values, field): """ Compute field (i.e. update its spline coefficients) such that it interpolates a certain function $f(x1,x2,..)$ at the Greville points. Parameters ---------- values : StencilVector Function values $f(x_i)$ at the n-dimensional tensor grid of Greville points $x_i$, to be interpolated. field : FemField Input/output argument: tensor spline that has to interpolate the given values. """ assert values.space is self.coeff_space assert isinstance( field, FemField ) assert field.space is self if not self._interpolation_ready: self.init_interpolation() # TODO: check if OK to access private attribute '_interpolator' in self.spaces[i] kronecker_solve( solvers = [V._interpolator for V in self.spaces], rhs = values, out = field.coeffs, )
# ...
[docs] def reduce_grid(self, axes=(), knots=()): """ Create a new TensorFemSpace object with a coarser grid than the original one we do that by giving a new knot sequence in the desired dimension. Parameters ---------- axes : List of int Dimensions where we want to coarsen the grid. knots : List or tuple New knot sequences in each dimension. Returns ------- V : TensorFemSpace New space with a coarser grid. """ assert len(axes) == len(knots) comm = MPI.COMM_WORLD rank = comm.Get_rank() v = self._coeff_space spaces = list(self.spaces) global_starts = list(v._cart._global_starts).copy() global_ends = list(v._cart._global_ends).copy() global_domains_ends = self._global_element_ends for i, axis in enumerate(axes): space = spaces[axis] degree = space.degree periodic = space.periodic breaks = space.breaks T = list(knots[i]).copy() elements_ends = global_domains_ends[axis] boundaries = breaks[elements_ends+1].tolist() for b in boundaries: if b not in T: T.append(b) T.sort() new_space = SplineSpace(degree, knots=T, periodic=periodic, dirichlet=space.dirichlet, basis=space.basis) spaces[axis] = new_space breaks = new_space.breaks.tolist() elements_ends = np.array([breaks.index(bd) for bd in boundaries])-1 elements_starts = np.array([0] + (elements_ends[:-1]+1).tolist()) if periodic: global_starts[axis] = elements_starts global_ends[axis] = elements_ends else: global_starts[axis] = elements_starts + degree - 1 global_ends[axis] = elements_ends + degree - 1 global_ends[axis][-1] += 1 global_starts[axis][0] = 0 cart = v._cart.reduce_grid(tuple(global_starts), tuple(global_ends)) V = TensorFemSpace(cart.domain_decomposition, *spaces, cart=cart, dtype=v.dtype) return V
# ...
[docs] def export_fields(self, filename, **fields): """ Write spline coefficients of given fields to HDF5 file. """ assert isinstance(filename, str) assert all(field.space is self for field in fields.values()) V = self.coeff_space comm = V.cart.comm if V.parallel else None # Multi-dimensional index range local to process index = tuple(slice(s, e+1) for s,e in zip(V.starts, V.ends)) # Create HDF5 file (in parallel mode if MPI communicator size > 1) kwargs = {} if comm is not None: if comm.size > 1: kwargs.update(driver='mpio', comm=comm) h5 = h5py.File(filename, mode='w', **kwargs) # Add field coefficients as named datasets for name,field in fields.items(): dset = h5.create_dataset(name, shape=V.npts, dtype=V.dtype) dset[index] = field.coeffs[index] # Close HDF5 file h5.close()
# ...
[docs] def import_fields(self, filename, *field_names): """ Load fields from HDF5 file containing spline coefficients. Parameters ---------- filename : str Name of HDF5 input file. field_names : list of str Names of the datasets with the required spline coefficients. Returns ------- fields : list of FemSpace objects Distributed fields, given in the same order of the names. """ assert isinstance(filename, str) assert all(isinstance(name, str) for name in field_names) V = self.coeff_space comm = V.cart.comm if V.parallel else None # Multi-dimensional index range local to process index = tuple(slice(s, e+1) for s,e in zip(V.starts, V.ends)) # Open HDF5 file (in parallel mode if MPI communicator size > 1) kwargs = {} if comm is not None: if comm.size > 1: kwargs.update(driver='mpio', comm=comm) h5 = h5py.File(filename, mode='r', **kwargs) # Create fields and load their coefficients from HDF5 datasets fields = [] for name in field_names: dset = h5[name] if dset.shape != V.npts: h5.close() raise TypeError('Dataset not compatible with spline space.') field = FemField(self) field.coeffs[index] = dset[index] field.coeffs.update_ghost_regions() fields.append(field) # Close HDF5 file h5.close() return fields
# ...
[docs] def reduce_degree(self, axes, multiplicity=None, basis='B'): if isinstance(axes, int): axes = [axes] if isinstance(multiplicity, int): multiplicity = [multiplicity] if multiplicity is None: multiplicity = [self.multiplicity[i] for i in axes] v = self._coeff_space spaces = list(self.spaces) for m, axis in zip(multiplicity, axes): space = spaces[axis] reduced_space = SplineSpace( degree = space.degree - 1, pads = space.pads, grid = space.breaks, multiplicity= m, parent_multiplicity=space.multiplicity, periodic = space.periodic, dirichlet = space.dirichlet, basis = basis ) spaces[axis] = reduced_space npts = [s.nbasis for s in spaces] multiplicity = [s.multiplicity for s in spaces] global_starts, global_ends = partition_coefficients(v.cart.domain_decomposition, spaces) # create new CartDecomposition red_cart = v.cart.reduce_npts(npts, global_starts, global_ends, shifts=multiplicity) # create new TensorFemSpace tensor_vec = TensorFemSpace(self._domain_decomposition, *spaces, cart=red_cart, dtype=v.dtype) tensor_vec._interpolation_ready = False for key in self._refined_space: if key == tuple(self.ncells): tensor_vec.set_refined_space(key, tensor_vec) else: tensor_vec.set_refined_space(key, self._refined_space[key].reduce_degree(axes, multiplicity, basis)) return tensor_vec
# ...
[docs] def add_refined_space(self, ncells): """ refine the space with new ncells and add it to the dictionary of refined_space""" ncells = tuple(ncells) if ncells in self._refined_space: return if ncells == tuple(self.ncells): self.set_refined_space(ncells, self) return spaces = [s.refine(n) for s,n in zip(self.spaces, ncells)] npts = [s.nbasis for s in spaces] domain = self.domain_decomposition new_global_starts = [] new_global_ends = [] for i in range(domain.ndim): gs = domain.global_element_starts[i] ge = domain.global_element_ends [i] new_global_starts.append([]) new_global_ends .append([]) for s,e in zip(gs, ge): bs = self.spaces[i].breaks[s] be = self.spaces[i].breaks[e+1] s = spaces[i].breaks.tolist().index(bs) e = spaces[i].breaks.tolist().index(be) new_global_starts[-1].append(s) new_global_ends [-1].append(e-1) new_global_starts[-1] = np.array(new_global_starts[-1]) new_global_ends [-1] = np.array(new_global_ends [-1]) new_domain = domain.refine(ncells, new_global_starts, new_global_ends) new_space = TensorFemSpace(new_domain, *spaces, dtype=self._coeff_space.dtype) self.set_refined_space(ncells, new_space)
# ...
[docs] def create_interface_space(self, axis, ext, cart): """ Create a new interface fem space along a given axis and extremity. Parameters ---------- axis : int The axis of the new Interface space. ext: int The extremity of the new Interface space. the values must be 1 or -1. cart: CartDecomposition The cart of the new space, needed in the parallel case. """ axis = int(axis) ext = int(ext) assert axis < self.ldim assert ext in [-1, 1] if cart.is_comm_null or self._interfaces.get((axis, ext), None): return spaces = self.spaces coeff_space = self.coeff_space coeff_space.set_interface(axis, ext, cart) space = TensorFemSpace(self._domain_decomposition, *spaces, coeff_space=coeff_space.interfaces[axis, ext], dtype=coeff_space.dtype) self._interfaces[axis, ext] = space
[docs] def get_refined_space(self, ncells): return self._refined_space[tuple(ncells)]
[docs] def set_refined_space(self, ncells, new_space): assert all(nc1==nc2 for nc1,nc2 in zip(ncells, new_space.ncells)) self._refined_space[tuple(ncells)] = new_space
# ...
[docs] def plot_2d_decomposition(self, mapping=None, refine=10): import matplotlib.pyplot as plt from matplotlib.patches import Polygon, Patch from psydac.utilities.utils import refine_array_1d if mapping is None: mapping = lambda eta: eta else: assert mapping.ldim == self.ldim == 2 assert mapping.pdim == self.ldim == 2 assert refine >= 1 N = int(refine) V1, V2 = self.spaces mpi_comm = self.coeff_space.cart.comm mpi_rank = mpi_comm.rank # Local grid, refined [sk1, sk2], [ek1, ek2] = self.local_domain eta1 = refine_array_1d(V1.breaks[sk1:ek1+2], N) eta2 = refine_array_1d(V2.breaks[sk2:ek2+2], N) pcoords = np.array([[mapping(e1, e2) for e2 in eta2] for e1 in eta1]) # Local domain as Matplotlib polygonal patch AB = pcoords[ :, 0, :] # eta2 = min BC = pcoords[ -1, :, :] # eta1 = max CD = pcoords[::-1, -1, :] # eta2 = max (points must be reversed) DA = pcoords[ 0, ::-1, :] # eta1 = min (points must be reversed) xy = np.concatenate([AB, BC, CD, DA], axis=0) poly = Polygon(xy, edgecolor='None') # Gather polygons on master process polys = mpi_comm.gather(poly) #------------------------------- # Non-master processes stop here if mpi_rank != 0: return #------------------------------- # Global grid, refined eta1 = refine_array_1d(V1.breaks, N) eta2 = refine_array_1d(V2.breaks, N) pcoords = np.array([[mapping(e1, e2) for e2 in eta2] for e1 in eta1]) xx = pcoords[:, :, 0] yy = pcoords[:, :, 1] # Plot decomposed domain fig, ax = plt.subplots(1, 1) colors = itertools.cycle(plt.rcParams['axes.prop_cycle'].by_key()['color']) handles = [] for i, (poly, color) in enumerate(zip(polys, colors)): # Add patch poly.set_facecolor(color) ax.add_patch(poly) # Create legend entry handle = Patch(color=color, label='Rank {}'.format(i)) handles.append(handle) ax.set_xlabel(r'$x$', rotation='horizontal') ax.set_ylabel(r'$y$', rotation='horizontal') ax.set_title ('Domain decomposition') ax.plot(xx[:,::N] , yy[:,::N] , 'k') ax.plot(xx[::N,:].T, yy[::N,:].T, 'k') ax.set_aspect('equal') ax.legend(handles=handles, bbox_to_anchor=(1.05, 1), loc=2) fig.tight_layout() fig.show()
# ... def __str__(self): """Pretty printing""" txt = '\n' txt += '> ldim :: {ldim}\n'.format(ldim=self.ldim) txt += '> total nbasis :: {dim}\n'.format(dim=self.nbasis) dims = ', '.join(str(V.nbasis) for V in self.spaces) txt += '> nbasis :: ({dims})\n'.format(dims=dims) return txt