Source code for struphy.propagators.variational_density_evolve

import logging
from copy import deepcopy
from dataclasses import dataclass
from typing import Literal

import cunumpy as xp
from feectools.ddm.mpi import mpi as MPI
from feectools.linalg.basic import IdentityOperator
from feectools.linalg.block import BlockLinearOperator, BlockVectorSpace
from feectools.linalg.solvers import inverse
from line_profiler import profile

from struphy.feec import preconditioner
from struphy.feec.preconditioner import MassMatrixDiagonalPreconditioner
from struphy.feec.variational_utilities import (
    InternalEnergyEvaluator,
    KineticEnergyEvaluator,
)
from struphy.io.options import LiteralOptions
from struphy.linear_algebra.solver import NonlinearSolverParameters, SolverParameters
from struphy.models.variables import FEECVariable
from struphy.propagators.base import Propagator
from struphy.utils.utils import check_option

logger = logging.getLogger("struphy")


[docs] class VariationalDensityEvolve(Propagator): r""":ref:`FEEC <gempic>` discretization of the following equations: find :math:`\rho \in L^2` and :math:`\mathbf u \in (H^1)^3` such that .. math:: &\partial_t \rho + \nabla \cdot ( \tilde{\rho} \mathbf u ) = 0 \,, \\[4mm] &\int_\Omega \partial_t (\rho \mathbf u) \cdot \mathbf v\,\textrm d \mathbf x + \int_\Omega \left(\frac{|\mathbf u|^2}{2} - \frac{\partial(\rho \mathcal U(\rho))}{\partial \rho}\right) \nabla \cdot (\tilde{\rho} \mathbf v) \,\textrm d \mathbf x = 0 \qquad \forall \, \mathbf v \in (H^1)^3\,. Where :math:`\tilde{\rho}` is either :math:`\rho` for full-f models, :math:`\rho_0` for linear models or :math:`\rho_0+\rho` for :math:`\delta f` models. In the case of linear model, the second equation is not updated. On the logical domain: .. math:: \begin{align} &\partial_t \hat{\rho}^3 + \nabla \cdot ( \hat{\rho}^3 \hat{\mathbf{u}} ) = 0 ~ , \\[4mm] &\int_{\hat{\Omega}} \partial_t ( \hat{\rho}^3 \hat{\mathbf{u}}) \cdot G \hat{\mathbf{v}} \, \textrm d \boldsymbol \eta + \int_{\hat{\Omega}} \left( \frac{| DF \hat{\mathbf{u}} |^2}{2} - \frac{\partial (\hat{\rho}^3 \mathcal U)}{\partial \hat{\rho}^3} \right) \nabla \cdot (\hat{\rho}^3 \hat{\mathbf{v}}) \, \textrm d \boldsymbol \eta = 0 ~ , \\[2mm] \end{align} where :math:`\mathcal U` depends on the chosen model. It is discretized as .. math:: \begin{align} &\frac{\mathbb M^v[\hat{\rho}_h^{n+1}] \mathbf u^{n+1}- \mathbb M^v[\hat{\rho}_h^n] \mathbf u^n}{\Delta t} + (\mathbb D \hat{\Pi}^{2}[\hat{\rho_h^{n}} \vec{\boldsymbol \Lambda}^v])^\top \hat{l}^3\left(\frac{DF \hat{\mathbf{u}}_h^{n+1} \cdot DF \hat{\mathbf{u}}_h^{n}}{2} - \frac{\hat{\rho}_h^{n+1}\mathcal U(\hat{\rho}_h^{n+1})-\hat{\rho}_h^{n}\mathcal U(\hat{\rho}_h^{n})}{\hat{\rho}_h^{n+1}-\hat{\rho}_h^n} \right) = 0 ~ , \\[2mm] &\frac{\boldsymbol \rho^{n+1}- \boldsymbol \rho^n}{\Delta t} + \mathbb D \hat{\Pi}^{2}[\hat{\rho_h^{n}} \vec{\boldsymbol \Lambda}^v] \mathbf u^{n+1/2} = 0 ~ , \\[2mm] \end{align} where :math:`\hat{l}^3(f)` denotes the vector representing the linear form :math:`v_h \mapsto \int_{\hat{\Omega}} f(\boldsymbol \eta) v_h(\boldsymbol \eta) d \boldsymbol \eta`, that is the vector with components .. math:: \hat{l}^3(f)_{ijk}=\int_{\hat{\Omega}} f \Lambda^3_{ijk} \textrm d \boldsymbol \eta and the weights in the the :class:`~struphy.feec.basis_projection_ops.BasisProjectionOperator` and the :class:`~struphy.feec.mass.WeightedMassOperator` are given by .. math:: \hat{\mathbf{u}}_h^{k} = (\mathbf{u}^{k})^\top \vec{\boldsymbol \Lambda}^v \in (V_h^0)^3 \, \text{for k in} \{n, n+1/2, n+1\}, \qquad \hat{\rho}_h^{k} = (\rho^{k})^\top \vec{\boldsymbol \Lambda}^3 \in V_h^3 \, \text{for k in} \{n, n+1/2, n+1\} . """
[docs] class Variables: """Container for variables advanced by :class:`VariationalDensityEvolve`. Attributes ---------- rho : FEECVariable Density variable in ``"L2"`` space. u : FEECVariable Velocity variable in ``"H1vec"`` space. """ def __init__(self): self._rho: FEECVariable = None self._u: FEECVariable = None @property def rho(self) -> FEECVariable: return self._rho @rho.setter def rho(self, new): assert isinstance(new, FEECVariable) assert new.space == "L2" self._rho = new @property def u(self) -> FEECVariable: return self._u @u.setter def u(self, new): assert isinstance(new, FEECVariable) assert new.space == "H1vec" self._u = new
def __init__(self): self.variables = self.Variables()
[docs] @dataclass class Options: """Configuration options for :class:`VariationalDensityEvolve`. Parameters ---------- model : {"pressureless", "barotropic", "full", "full_p", "full_q", "linear", "deltaf", "linear_q", "deltaf_q"}, default="barotropic" Density/thermodynamic model variant. gamma : float, default=5/3 Adiabatic index. solver : LiteralOptions.OptsSymmSolver, default="pcg" Linear solver for implicit substeps. precond : LiteralOptions.OptsMassPrecond, default="MassMatrixPreconditioner" Preconditioner used in linear solves. solver_params : SolverParameters, default=None Linear-solver controls. nonlin_solver : NonlinearSolverParameters, default=None Nonlinear iteration controls. s : FEECVariable, default=None Entropy-like variable required by ``model="full"``. """ # specific literals OptsModel = Literal[ "pressureless", "barotropic", "full", "full_p", "full_q", "linear", "deltaf", "linear_q", "deltaf_q", ] # propagator options model: OptsModel = "barotropic" gamma: float = 5.0 / 3.0 solver: LiteralOptions.OptsSymmSolver = "pcg" precond: LiteralOptions.OptsMassPrecond = "MassMatrixPreconditioner" solver_params: SolverParameters = None nonlin_solver: NonlinearSolverParameters = None s: FEECVariable = None def __post_init__(self): # checks check_option(self.model, self.OptsModel) check_option(self.solver, LiteralOptions.OptsSymmSolver) check_option(self.precond, LiteralOptions.OptsMassPrecond) # defaults if self.solver_params is None: self.solver_params = SolverParameters() if self.nonlin_solver is None: self.nonlin_solver = NonlinearSolverParameters()
@property def options(self) -> Options: if not hasattr(self, "_options"): self._options = self.Options() return self._options @options.setter def options(self, new): assert isinstance(new, self.Options) self._options = new @profile def allocate(self, verbose: bool = False): if self.options.model == "full": assert self.options.s is not None self._model = self.options.model self._gamma = self.options.gamma self._s = self.options.s self._lin_solver = self.options.solver_params self._nonlin_solver = self.options.nonlin_solver self._linearize = self.options.nonlin_solver.linearize self._info = self.options.nonlin_solver.info and (MPI.COMM_WORLD.Get_rank() == 0) self._Mrho = self.mass_ops.WMMnew pc = MassMatrixDiagonalPreconditioner(self._Mrho) self._Mrho_inv = inverse( self._Mrho, "pcg", pc=pc, tol=1e-16, maxiter=500, recycle=True, ) # Femfields for the projector self.rhof = self.derham.create_spline_function("rhof", "L2") self.rhof1 = self.derham.create_spline_function("rhof1", "L2") rho = self.variables.rho.spline.vector u = self.variables.u.spline.vector # Projector self._energy_evaluator = InternalEnergyEvaluator(self.derham, self._gamma) self._kinetic_evaluator = KineticEnergyEvaluator(self.derham, self.domain, self.mass_ops) self._initialize_projectors_and_mass() if self._model in ["linear", "linear_q"]: rhotmp = self.projected_equil.n3 elif self._model in ["deltaf", "deltaf_q"]: self._tmp_rho_deltaf = rho.space.zeros() rhotmp = rho.copy(out=self._tmp_rho_deltaf) rhotmp += self.projected_equil.n3 else: rhotmp = rho self._update_weighted_MM(rhotmp) # bunch of temporaries to avoid allocating in the loop self._tmp_un1 = u.space.zeros() self._tmp_un2 = u.space.zeros() self._tmp_un12 = u.space.zeros() self._tmp_rhon1 = rho.space.zeros() self._tmp_un_diff = u.space.zeros() self._tmp_rhon12 = rho.space.zeros() self._tmp_rhon_diff = rho.space.zeros() self._tmp_un_weak_diff = u.space.zeros() self._tmp_mn_diff = u.space.zeros() self._tmp_mn = u.space.zeros() self._tmp_mn1 = u.space.zeros() self._tmp_advection = u.space.zeros() self._tmp_rho_advection = rho.space.zeros() self._linear_form_dl_drho = rho.space.zeros() # Compute the initial force in case we want to 'linearize' around a given equilibrium if self._linearize: self._compute_init_linear_form() if self._model in ["linear", "linear_q"]: self._update_Pirho(self.projected_equil.n3) @profile def __call__(self, dt): self.__call_newton(dt) def __call_newton(self, dt): """Solve the non linear system for updating the variables using Newton iteration method""" if self._info: logger.info("") logger.info("Newton iteration in VariationalDensityEvolve") # Initial variables rhon = self.variables.rho.spline.vector un = self.variables.u.spline.vector if self._model in ["linear", "linear_q"]: advection = self.divPirho.dot(un, out=self._tmp_rho_advection) advection *= dt rhon1 = rhon.copy(out=self._tmp_rhon1) rhon1 -= advection self.update_feec_variables(rho=rhon1, u=un) return if self._model in ["deltaf", "deltaf_q"]: rho = rhon.copy(out=self._tmp_rho_deltaf) rho += self.projected_equil.n3 else: rho = rhon self._update_weighted_MM(rho) mn = self._Mrho.dot(un, out=self._tmp_mn) # Initialize variable for Newton iteration if self._model == "full": s = self._s.spline.vector else: s = None if self._model in ["deltaf", "deltaf_q"]: rho = rhon.copy(out=self._tmp_rho_deltaf) rho += self.projected_equil.n3 else: rho = rhon self._update_Pirho(rho) rhon1 = rhon.copy(out=self._tmp_rhon1) rhon1 += self._tmp_rhon_diff if self._model in ["deltaf", "deltaf_q"]: rho = rhon1.copy(out=self._tmp_rho_deltaf) rho += self.projected_equil.n3 else: rho = rhon1 self._update_weighted_MM(rho) un1 = un.copy(out=self._tmp_un1) un1 += self._tmp_un_diff mn1 = self._Mrho.dot(un1, out=self._tmp_mn1) tol = self._nonlin_solver.tol err = tol + 1 for it in range(self._nonlin_solver.maxiter): # Newton iteration un12 = un.copy(out=self._tmp_un12) un12 += un1 un12 *= 0.5 # rhon12 = rhon.copy(out=self._tmp_rhon12) # rhon12 += rhon1 # rhon12 *= 0.5 # if self._model == "deltaf": # rhon12 += self.projected_equil.n3 # self._update_Pirho(rhon12) # Update the linear form self._update_linear_form_dl_drho(rhon, rhon1, un, un1, s) # Compute the advection terms advection = self.divPirhoT.dot( self._linear_form_dl_drho, out=self._tmp_advection, ) advection *= dt rho_advection = self.divPirho.dot( un12, out=self._tmp_rho_advection, ) rho_advection *= dt # Get diff rhon_diff = rhon1.copy(out=self._tmp_rhon_diff) rhon_diff -= rhon rhon_diff += rho_advection mn_diff = mn1.copy(out=self._tmp_mn_diff) mn_diff -= mn mn_diff += advection # Get error err = self._get_error_newton(mn_diff, rhon_diff) if self._info: logger.info(f"iteration : {it} error : {err}") if err < tol**2 or xp.isnan(err): break # Derivative for Newton self._get_jacobian(dt, rhon, rhon1, un, un1, s) # Newton step self._tmp_f[0] = mn_diff self._tmp_f[1] = rhon_diff incr = self._inv_Jacobian.dot(self._tmp_f, out=self._tmp_incr) if self._info: logger.info( "information on the linear solver : ", self._inv_Jacobian._solver._info, ) un1 -= incr[0] rhon1 -= incr[1] # Multiply by the mass matrix to get the momentum if self._model in ["deltaf", "deltaf_q"]: rho = rhon1.copy(out=self._tmp_rho_deltaf) rho += self.projected_equil.n3 self._update_weighted_MM(rho) else: self._update_weighted_MM(rhon1) mn1 = self._Mrho.dot(un1, out=self._tmp_mn1) if it == self._nonlin_solver.maxiter - 1 or xp.isnan(err): logger.info( f"!!!Warning: Maximum iteration in VariationalDensityEvolve reached - not converged:\n {err =} \n {tol**2 =}", ) self._tmp_un_diff = un1 - un self._tmp_rhon_diff = rhon1 - rhon self.update_feec_variables(rho=rhon1, u=un1) def _initialize_projectors_and_mass(self): """Initialization of all the `BasisProjectionOperator` and `CoordinateProjector` needed to compute the bracket term""" from struphy.feec.mass import L2Projector from struphy.feec.variational_utilities import L2_transport_operator # Initialize the transport operator and transposed self.divPirho = L2_transport_operator(self.derham) self.divPirhoT = self.divPirho.T # Inverse mass matrix needed to compute the error self.pc_Mv = preconditioner.MassMatrixDiagonalPreconditioner( self.mass_ops.Mv, ) self._inv_Mv = inverse( self.mass_ops.Mv, "pcg", pc=self.pc_Mv, tol=1e-16, maxiter=1000, verbose=False, recycle=True, ) integration_grid = [grid_1d.flatten() for grid_1d in self.derham.V0splines.quad_grid_pts[0]] self.integration_grid_spans, self.integration_grid_bn, self.integration_grid_bd = ( self.derham.prepare_eval_tp_fixed( integration_grid, ) ) # tmps grid_shape = tuple([len(loc_grid) for loc_grid in integration_grid]) self._rhof_values = xp.zeros(grid_shape, dtype=float) # Other mass matrices for newton solve self._M_drho = self.mass_ops.create_weighted_mass("L2", "L2") Jacs = BlockVectorSpace( self.derham.Vvpol, self.derham.V3pol, ) self._tmp_f = Jacs.zeros() self._tmp_incr = Jacs.zeros() self._Jacobian = BlockLinearOperator(Jacs, Jacs) # local version to avoid creating new version of LinearOperator every time self._I3 = IdentityOperator(self.derham.V3pol) self._dt_pc_divPirhoT = 2 * (self.divPirhoT) self._dt2_pc_divPirhoT = 2 * (self.divPirhoT) self._dt2_divPirho = 2 * self.divPirho self._Jacobian[0, 0] = self._Mrho + self._dt2_pc_divPirhoT @ self._kinetic_evaluator.M_un self._Jacobian[0, 1] = self._kinetic_evaluator.M_un1 + self._dt_pc_divPirhoT @ self._M_drho self._Jacobian[1, 0] = self._dt2_divPirho self._Jacobian[1, 1] = self._I3 from struphy.linear_algebra.schur_solver import SchurSolverFull self._inv_Jacobian = SchurSolverFull( self._Jacobian, "pbicgstab", pc=self._Mrho_inv, tol=self._lin_solver.tol, maxiter=self._lin_solver.maxiter, verbose=self._lin_solver.verbose, recycle=True, ) # self._inv_Jacobian = inverse(self._Jacobian, # 'gmres', # tol=self._lin_solver.tol, # maxiter=self._lin_solver.maxiter, # verbose=self._lin_solver.verbose, # recycle=True) # L2-projector for V3 self._get_L2dofs_V3 = L2Projector("L2", self.mass_ops).get_dofs grid_shape = tuple([len(loc_grid) for loc_grid in integration_grid]) # tmps self._eval_dl_drho = xp.zeros(grid_shape, dtype=float) self._uf_values = [xp.zeros(grid_shape, dtype=float) for i in range(3)] self._uf1_values = [xp.zeros(grid_shape, dtype=float) for i in range(3)] self._tmp_int_grid = xp.zeros(grid_shape, dtype=float) self._tmp_int_grid2 = xp.zeros(grid_shape, dtype=float) self._rhof_values = xp.zeros(grid_shape, dtype=float) self._rhof1_values = xp.zeros(grid_shape, dtype=float) if self._model == "full": self._tmp_de_drho = xp.zeros(grid_shape, dtype=float) gam = self._gamma metric = xp.power( self.domain.jacobian_det( *integration_grid, ), 2 - gam, ) self._proj_rho2_metric_term = deepcopy(metric) metric = xp.power( self.domain.jacobian_det( *integration_grid, ), 1 - gam, ) self._proj_drho_metric_term = deepcopy(metric) if self._linearize: self._init_dener_drho = xp.zeros(grid_shape, dtype=float) def _update_Pirho(self, rho): """Update the weights of the `BasisProjectionOperator` Pirho""" self.divPirho.update_coeffs(rho) self.divPirhoT.update_coeffs(rho) def _update_weighted_MM(self, rho): """update the weighted mass matrix operator""" self._Mrho.spline_functions["l2_field"].vector = rho self._Mrho.assemble() logger.debug(f"In VariationalDensityEvolve: {self._Mrho_inv._options['pc'] = }") if hasattr(self, "_Mrho_inv") and isinstance(self._Mrho_inv._options["pc"], MassMatrixDiagonalPreconditioner): self._Mrho_inv._options["pc"].update_mass_operator(self._Mrho) def _update_linear_form_dl_drho(self, rhon, rhon1, un, un1, sn): """Update the linearform representing integration in V3 against kinetic energy""" self._kinetic_evaluator.get_u2_grid(un, un1, self._eval_dl_drho) if self._model == "barotropic": rhof_values = self.rhof.eval_tp_fixed_loc( self.integration_grid_spans, self.integration_grid_bd, out=self._rhof_values, ) rhof1_values = self.rhof1.eval_tp_fixed_loc( self.integration_grid_spans, self.integration_grid_bd, out=self._rhof1_values, ) # self._eval_dl_drho -= (rhof_values + rhof1_values)/2 rhof_values /= 2 rhof1_values /= 2 self._eval_dl_drho -= rhof_values self._eval_dl_drho -= rhof1_values if self._model == "full": self._energy_evaluator.evaluate_discrete_de_drho_grid(rhon, rhon1, sn, out=self._tmp_de_drho) self._tmp_int_grid *= 0 self._tmp_int_grid += self._tmp_de_drho if self._linearize: self._tmp_int_grid -= self._init_dener_drho self._tmp_int_grid *= self._proj_rho2_metric_term # self._eval_dl_drho -= self._proj_rho2_metric_term * (self._energy_evaluator._DG_values + de_rhom_s) self._eval_dl_drho -= self._tmp_int_grid self._get_L2dofs_V3(self._eval_dl_drho, dofs=self._linear_form_dl_drho) def _compute_init_linear_form(self): if abs(self._gamma - 5 / 3) < 1e-3: self._energy_evaluator.evaluate_exact_de_drho_grid( self.projected_equil.n3, self.projected_equil.s3_monoatomic, out=self._init_dener_drho, ) elif abs(self._gamma - 7 / 5) < 1e-3: self._energy_evaluator.evaluate_exact_de_drho_grid( self.projected_equil.n3, self.projected_equil.s3_diatomic, out=self._init_dener_drho, ) else: raise ValueError("Gamma should be 7/5 or 5/3 for if you want to linearize") def _get_jacobian(self, dt, rhon, rhon1, un, un1, sn): self._kinetic_evaluator.assemble_M_un(un) self._kinetic_evaluator.assemble_M_un1(un1) if self._model == "barotropic": self._M_drho = -self.mass_ops.M3 / 2.0 elif self._model == "full": self._energy_evaluator.evaluate_discrete_d2e_drho2_grid(rhon, rhon1, sn, out=self._tmp_int_grid) self._tmp_int_grid *= self._proj_drho_metric_term self._M_drho.assemble([[self._tmp_int_grid]]) else: self._M_drho.assemble([[0.0 * self._tmp_int_grid]]) # This way we can update only the scalar multiplying the operator and avoid creating multiple operators self._dt_pc_divPirhoT._scalar = dt self._dt2_pc_divPirhoT._scalar = dt / 2 self._dt2_divPirho._scalar = dt / 2 def _get_error_newton(self, mn_diff, rhon_diff): """Error for the newton method : max(|f(0)|,|f(1)|) where f is the function we're trying to nullify""" err_u = self._inv_Mv.dot_inner( self.derham.boundary_ops["v"].dot(mn_diff), mn_diff, ) err_rho = self.mass_ops.M3.dot_inner(rhon_diff, rhon_diff) return max(err_rho, err_u)