Source code for flowjax.bijections.rational_quadratic_spline

"""Rational quadratic spline bijections ("""

from functools import partial
from typing import ClassVar

import jax
import jax.numpy as jnp
from jaxtyping import Array, Float
from paramax import AbstractUnwrappable, Parameterize
from paramax.utils import inv_softplus

from flowjax.bijections.bijection import AbstractBijection

def _real_to_increasing_on_interval(
    arr: Float[Array, " dim"],
    interval: tuple[int | float, int | float],
    softmax_adjust: float = 1e-2,
    pad_with_ends: bool = True,
    """Transform unconstrained vector to monotonically increasing positions on [-B, B].

        arr: Parameter vector.
        interval: Interval to transform output. Defaults to 1.
        softmax_adjust : Rescales softmax output using
            ``(widths + softmax_adjust/widths.size) / (1 + softmax_adjust)``. e.g.
            0=no adjustment, 1=average softmax output with evenly spaced widths, >1
            promotes more evenly spaced widths.
        pad_with_ends: Whether to pad the with -interval and interval. Defaults to True.
    if softmax_adjust < 0:
        raise ValueError("softmax_adjust should be >= 0.")

    widths = jax.nn.softmax(arr)
    widths = (widths + softmax_adjust / widths.size) / (1 + softmax_adjust)
    widths =[0].set(widths[0] / 2)
    scale = interval[1] - interval[0]
    pos = interval[0] + scale * jnp.cumsum(widths)

    if pad_with_ends:
        pos = jnp.pad(pos, pad_width=1, constant_values=interval)

    return pos

[docs] class RationalQuadraticSpline(AbstractBijection): """Scalar RationalQuadraticSpline transformation ( Args: knots: Number of knots. interval: Interval to transform, if a scalar value, uses [-interval, interval], if a tuple, uses [interval[0], interval[1]] min_derivative: Minimum dervivative. Defaults to 1e-3. softmax_adjust: Controls minimum bin width and height by rescaling softmax output, e.g. 0=no adjustment, 1=average softmax output with evenly spaced widths, >1 promotes more evenly spaced widths. See ``real_to_increasing_on_interval``. Defaults to 1e-2. """ knots: int interval: tuple[int | float, int | float] softmax_adjust: float | int min_derivative: float x_pos: Array | AbstractUnwrappable[Array] y_pos: Array | AbstractUnwrappable[Array] derivatives: Array | AbstractUnwrappable[Array] shape: ClassVar[tuple] = () cond_shape: ClassVar[None] = None def __init__( self, *, knots: int, interval: float | int | tuple[int | float, int | float], min_derivative: float = 1e-3, softmax_adjust: float | int = 1e-2, ): self.knots = knots interval = interval if isinstance(interval, tuple) else (-interval, interval) self.interval = interval self.softmax_adjust = softmax_adjust self.min_derivative = min_derivative to_interval = jnp.vectorize( partial( _real_to_increasing_on_interval, interval=interval, softmax_adjust=softmax_adjust, ), signature="(a)->(b)", ) self.x_pos = Parameterize(to_interval, jnp.zeros(knots)) self.y_pos = Parameterize(to_interval, jnp.zeros(knots)) self.derivatives = Parameterize( lambda arr: jax.nn.softplus(arr) + min_derivative, jnp.full(knots + 2, inv_softplus(1 - min_derivative)), ) def transform_and_log_det(self, x, condition=None): # Following notation from the paper x_pos, y_pos, derivatives = self.x_pos, self.y_pos, self.derivatives in_bounds = jnp.logical_and(x >= self.interval[0], x <= self.interval[1]) x_robust = jnp.where(in_bounds, x, 0) # To avoid nans k = jnp.searchsorted(x_pos, x_robust) - 1 # k is bin number xi = (x_robust - x_pos[k]) / (x_pos[k + 1] - x_pos[k]) sk = (y_pos[k + 1] - y_pos[k]) / (x_pos[k + 1] - x_pos[k]) dk, dk1, yk, yk1 = derivatives[k], derivatives[k + 1], y_pos[k], y_pos[k + 1] num = (yk1 - yk) * (sk * xi**2 + dk * xi * (1 - xi)) den = sk + (dk1 + dk - 2 * sk) * xi * (1 - xi) y = yk + num / den # eq. 4 # avoid numerical precision issues transforming from in -> out of bounds y = jnp.clip(y, self.interval[0], self.interval[1]) y = jnp.where(in_bounds, y, x) return y, jnp.log(self.derivative(x)).sum() def inverse_and_log_det(self, y, condition=None): # Following notation from the paper x_pos, y_pos, derivatives = self.x_pos, self.y_pos, self.derivatives in_bounds = jnp.logical_and(y >= self.interval[0], y <= self.interval[1]) y_robust = jnp.where(in_bounds, y, 0) # To avoid nans k = jnp.searchsorted(y_pos, y_robust) - 1 xk, xk1, yk, yk1 = x_pos[k], x_pos[k + 1], y_pos[k], y_pos[k + 1] sk = (yk1 - yk) / (xk1 - xk) y_delta_s_term = (y_robust - yk) * ( derivatives[k + 1] + derivatives[k] - 2 * sk ) a = (yk1 - yk) * (sk - derivatives[k]) + y_delta_s_term b = (yk1 - yk) * derivatives[k] - y_delta_s_term c = -sk * (y_robust - yk) sqrt_term = jnp.sqrt(b**2 - 4 * a * c) xi = (2 * c) / (-b - sqrt_term) x = xi * (xk1 - xk) + xk # avoid numerical precision issues transforming from in -> out of bounds x = jnp.clip(x, self.interval[0], self.interval[1]) x = jnp.where(in_bounds, x, y) return x, -jnp.log(self.derivative(x)).sum()
[docs] def derivative(self, x) -> Array: """The derivative dy/dx of the forward transformation.""" # Following notation from the paper (eq. 5) x_pos, y_pos, derivatives = self.x_pos, self.y_pos, self.derivatives in_bounds = jnp.logical_and(x >= self.interval[0], x <= self.interval[1]) x_robust = jnp.where(in_bounds, x, 0) # To avoid nans k = jnp.searchsorted(x_pos, x_robust) - 1 xi = (x_robust - x_pos[k]) / (x_pos[k + 1] - x_pos[k]) sk = (y_pos[k + 1] - y_pos[k]) / (x_pos[k + 1] - x_pos[k]) dk, dk1 = derivatives[k], derivatives[k + 1] num = sk**2 * (dk1 * xi**2 + 2 * sk * xi * (1 - xi) + dk * (1 - xi) ** 2) den = (sk + (dk1 + dk - 2 * sk) * xi * (1 - xi)) ** 2 derivative = num / den return jnp.where(in_bounds, derivative, 1.0)