Skip to content

EOS interpolation

Shared infrastructure for PALEOS-format tables: file readers (load_paleos_table for the 2-phase format, load_paleos_unified_table for the single-file-per-material format), the log-uniform grid builder used by both, and the inner kernels _fast_bilinear (O(1) bilinear lookup) and _paleos_clamp_temperature (per-cell temperature clamp to the table's valid range). These kernels are called millions of times per Zalmoxis solve, so they avoid np.interp binary searches by exploiting the log-uniform grid. The cache populated by _ensure_unified_cache is reused across calls within the same process.

interpolation

Shared interpolation helpers for PALEOS EOS tables.

Provides table loading, bilinear interpolation, and per-cell temperature clamping used by all PALEOS-based EOS lookups.

load_paleos_table(eos_file)

Load a PALEOS MgSiO3 table and build RegularGridInterpolator objects.

The PALEOS tables have 10 columns (SI units): P, T, rho, u, s, cp, cv, alpha, nabla_ad, phase_id(string). The grid is log-uniform in both P and T with 150 points per decade. Some grid cells are missing (unconverged corners), filled with NaN. A P=0 row may exist and is excluded for log interpolation.

Parameters:

Name Type Description Default
eos_file str

Path to the PALEOS table file.

required

Returns:

Type Description
dict

Cache entry with keys: - 'type': 'paleos' - 'density_interp': RegularGridInterpolator for rho(log10P, log10T) - 'nabla_ad_interp': RegularGridInterpolator for nabla_ad(log10P, log10T) - 'p_min', 'p_max': pressure bounds in Pa - 't_min', 't_max': temperature bounds in K

Source code in src/zalmoxis/eos/interpolation.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def load_paleos_table(eos_file):
    """Load a PALEOS MgSiO3 table and build RegularGridInterpolator objects.

    The PALEOS tables have 10 columns (SI units):
    P, T, rho, u, s, cp, cv, alpha, nabla_ad, phase_id(string).
    The grid is log-uniform in both P and T with 150 points per decade.
    Some grid cells are missing (unconverged corners), filled with NaN.
    A P=0 row may exist and is excluded for log interpolation.

    Parameters
    ----------
    eos_file : str
        Path to the PALEOS table file.

    Returns
    -------
    dict
        Cache entry with keys:
        - ``'type'``: ``'paleos'``
        - ``'density_interp'``: RegularGridInterpolator for rho(log10P, log10T)
        - ``'nabla_ad_interp'``: RegularGridInterpolator for nabla_ad(log10P, log10T)
        - ``'p_min'``, ``'p_max'``: pressure bounds in Pa
        - ``'t_min'``, ``'t_max'``: temperature bounds in K
    """
    # Read only numeric columns (0-8), skipping the string phase_id column (9)
    data = np.genfromtxt(eos_file, usecols=range(9), comments='#')

    pressures = data[:, 0]
    temps = data[:, 1]
    densities = data[:, 2]
    nabla_ad = data[:, 8]

    # Filter out P=0 rows (log10(0) is undefined)
    valid = pressures > 0
    pressures = pressures[valid]
    temps = temps[valid]
    densities = densities[valid]
    nabla_ad = nabla_ad[valid]

    # Work in log10 space for the grid axes
    log_p = np.log10(pressures)
    log_t = np.log10(temps)

    unique_log_p = np.unique(log_p)
    unique_log_t = np.unique(log_t)

    n_p = len(unique_log_p)
    n_t = len(unique_log_t)

    # Build 2D grids filled with NaN for missing cells
    density_grid = np.full((n_p, n_t), np.nan)
    nabla_ad_grid = np.full((n_p, n_t), np.nan)

    # Map log_p and log_t values to grid indices
    p_idx_map = {v: i for i, v in enumerate(unique_log_p)}
    t_idx_map = {v: i for i, v in enumerate(unique_log_t)}

    for k in range(len(pressures)):
        ip = p_idx_map[log_p[k]]
        it = t_idx_map[log_t[k]]
        density_grid[ip, it] = densities[k]
        nabla_ad_grid[ip, it] = nabla_ad[k]

    density_interp = RegularGridInterpolator(
        (unique_log_p, unique_log_t),
        density_grid,
        bounds_error=False,
        fill_value=np.nan,
    )
    nabla_ad_interp = RegularGridInterpolator(
        (unique_log_p, unique_log_t),
        nabla_ad_grid,
        bounds_error=False,
        fill_value=np.nan,
    )

    # Per-pressure valid T bounds for per-cell clamping.
    # At each pressure row, find the min and max log10(T) with finite density.
    # This lets us clamp queries into the valid domain when the grid has NaN
    # holes in the corners (e.g. high T at low P in the liquid table).
    logt_valid_min = np.full(n_p, np.nan)
    logt_valid_max = np.full(n_p, np.nan)
    for ip in range(n_p):
        finite_mask = np.isfinite(density_grid[ip, :])
        if finite_mask.any():
            valid_indices = np.where(finite_mask)[0]
            logt_valid_min[ip] = unique_log_t[valid_indices[0]]
            logt_valid_max[ip] = unique_log_t[valid_indices[-1]]

    # Nearest-neighbor fallback interpolators built from valid cells only.
    # Used when bilinear interpolation returns NaN near the ragged domain
    # boundary (a valid cell neighboring a NaN cell poisons bilinear output).
    valid_cells = np.isfinite(density_grid)
    ip_valid, it_valid = np.where(valid_cells)
    coords_valid = np.column_stack([unique_log_p[ip_valid], unique_log_t[it_valid]])

    density_nn = NearestNDInterpolator(coords_valid, density_grid[valid_cells])
    nabla_ad_nn = NearestNDInterpolator(
        coords_valid[np.isfinite(nabla_ad_grid[valid_cells])],
        nabla_ad_grid[valid_cells][np.isfinite(nabla_ad_grid[valid_cells])],
    )

    # Precompute grid spacing for O(1) bilinear interpolation.
    dlog_p = (unique_log_p[-1] - unique_log_p[0]) / (n_p - 1) if n_p > 1 else 1.0
    dlog_t = (unique_log_t[-1] - unique_log_t[0]) / (n_t - 1) if n_t > 1 else 1.0

    return {
        'type': 'paleos',
        'density_interp': density_interp,
        'nabla_ad_interp': nabla_ad_interp,
        'density_nn': density_nn,
        'nabla_ad_nn': nabla_ad_nn,
        'p_min': 10.0 ** unique_log_p[0],
        'p_max': 10.0 ** unique_log_p[-1],
        't_min': 10.0 ** unique_log_t[0],
        't_max': 10.0 ** unique_log_t[-1],
        'unique_log_p': unique_log_p,
        'unique_log_t': unique_log_t,
        'logt_valid_min': logt_valid_min,
        'logt_valid_max': logt_valid_max,
        # Fast bilinear interpolation data
        'density_grid': density_grid,
        'nabla_ad_grid': nabla_ad_grid,
        'logp_min': unique_log_p[0],
        'logp_max': unique_log_p[-1],
        'logt_min': unique_log_t[0],
        'logt_max': unique_log_t[-1],
        'dlog_p': dlog_p,
        'dlog_t': dlog_t,
        'n_p': n_p,
        'n_t': n_t,
    }

load_paleos_unified_table(eos_file)

Load a unified PALEOS table (single file per material) and build interpolators.

The unified tables use the same 10-column format as the 2-phase tables (P, T, rho, u, s, cp, cv, alpha, nabla_ad, phase_id) but contain all stable phases for a material in a single file. The thermodynamically stable phase at each (P, T) is encoded in the phase column.

In addition to density and nabla_ad interpolators, this function extracts the liquidus boundary from the phase column: for each pressure row, it finds the lowest temperature where the phase is 'liquid'.

Parameters:

Name Type Description Default
eos_file str

Path to the unified PALEOS table file.

required

Returns:

Type Description
dict

Cache entry with keys: - 'type': 'paleos_unified' - 'density_interp': RegularGridInterpolator for rho(log10P, log10T) - 'nabla_ad_interp': RegularGridInterpolator for nabla_ad(log10P, log10T) - 'density_nn', 'nabla_ad_nn': NearestNDInterpolator fallbacks - 'p_min', 'p_max': pressure bounds in Pa - 't_min', 't_max': temperature bounds in K - 'unique_log_p': unique log10(P) grid values - 'logt_valid_min', 'logt_valid_max': per-pressure T bounds - 'liquidus_log_p': log10(P) array for the extracted liquidus - 'liquidus_log_t': log10(T_liquidus) array at each pressure - 'phase_grid': 2D string array of phase identifiers

Source code in src/zalmoxis/eos/interpolation.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def load_paleos_unified_table(eos_file):
    """Load a unified PALEOS table (single file per material) and build interpolators.

    The unified tables use the same 10-column format as the 2-phase tables
    (P, T, rho, u, s, cp, cv, alpha, nabla_ad, phase_id) but contain all
    stable phases for a material in a single file. The thermodynamically
    stable phase at each (P, T) is encoded in the phase column.

    In addition to density and nabla_ad interpolators, this function
    extracts the liquidus boundary from the phase column: for each pressure
    row, it finds the lowest temperature where the phase is 'liquid'.

    Parameters
    ----------
    eos_file : str
        Path to the unified PALEOS table file.

    Returns
    -------
    dict
        Cache entry with keys:
        - ``'type'``: ``'paleos_unified'``
        - ``'density_interp'``: RegularGridInterpolator for rho(log10P, log10T)
        - ``'nabla_ad_interp'``: RegularGridInterpolator for nabla_ad(log10P, log10T)
        - ``'density_nn'``, ``'nabla_ad_nn'``: NearestNDInterpolator fallbacks
        - ``'p_min'``, ``'p_max'``: pressure bounds in Pa
        - ``'t_min'``, ``'t_max'``: temperature bounds in K
        - ``'unique_log_p'``: unique log10(P) grid values
        - ``'logt_valid_min'``, ``'logt_valid_max'``: per-pressure T bounds
        - ``'liquidus_log_p'``: log10(P) array for the extracted liquidus
        - ``'liquidus_log_t'``: log10(T_liquidus) array at each pressure
        - ``'phase_grid'``: 2D string array of phase identifiers
    """
    # Single-pass read: parse numeric columns and phase string together.
    # Avoids the 2x penalty of calling genfromtxt twice on 50-140 MB files.
    numeric_rows = []
    phase_list = []
    with open(eos_file) as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            parts = line.split()
            numeric_rows.append([float(x) for x in parts[:9]])
            phase_list.append(parts[9] if len(parts) > 9 else '')

    data_numeric = np.array(numeric_rows)
    phase_strings = np.array(phase_list, dtype=str)

    pressures = data_numeric[:, 0]
    temps = data_numeric[:, 1]
    densities = data_numeric[:, 2]
    nabla_ad = data_numeric[:, 8]

    # Filter out P=0 rows
    valid = pressures > 0
    pressures = pressures[valid]
    temps = temps[valid]
    densities = densities[valid]
    nabla_ad = nabla_ad[valid]
    phase_strings = np.char.strip(phase_strings[valid])

    # Work in log10 space
    log_p = np.log10(pressures)
    log_t = np.log10(temps)

    unique_log_p = np.unique(log_p)
    unique_log_t = np.unique(log_t)

    n_p = len(unique_log_p)
    n_t = len(unique_log_t)

    # Build 2D grids
    density_grid = np.full((n_p, n_t), np.nan)
    nabla_ad_grid = np.full((n_p, n_t), np.nan)
    phase_grid = np.full((n_p, n_t), '', dtype=object)

    p_idx_map = {v: i for i, v in enumerate(unique_log_p)}
    t_idx_map = {v: i for i, v in enumerate(unique_log_t)}

    for k in range(len(pressures)):
        ip = p_idx_map[log_p[k]]
        it = t_idx_map[log_t[k]]
        density_grid[ip, it] = densities[k]
        nabla_ad_grid[ip, it] = nabla_ad[k]
        phase_grid[ip, it] = phase_strings[k]

    density_interp = RegularGridInterpolator(
        (unique_log_p, unique_log_t),
        density_grid,
        bounds_error=False,
        fill_value=np.nan,
    )
    nabla_ad_interp = RegularGridInterpolator(
        (unique_log_p, unique_log_t),
        nabla_ad_grid,
        bounds_error=False,
        fill_value=np.nan,
    )

    # Per-pressure valid T bounds
    logt_valid_min = np.full(n_p, np.nan)
    logt_valid_max = np.full(n_p, np.nan)
    for ip in range(n_p):
        finite_mask = np.isfinite(density_grid[ip, :])
        if finite_mask.any():
            valid_indices = np.where(finite_mask)[0]
            logt_valid_min[ip] = unique_log_t[valid_indices[0]]
            logt_valid_max[ip] = unique_log_t[valid_indices[-1]]

    # Nearest-neighbor fallback interpolators
    valid_cells = np.isfinite(density_grid)
    ip_valid, it_valid = np.where(valid_cells)
    coords_valid = np.column_stack([unique_log_p[ip_valid], unique_log_t[it_valid]])

    density_nn = NearestNDInterpolator(coords_valid, density_grid[valid_cells])

    nabla_valid = np.isfinite(nabla_ad_grid[valid_cells])
    nabla_ad_nn = NearestNDInterpolator(
        coords_valid[nabla_valid],
        nabla_ad_grid[valid_cells][nabla_valid],
    )

    # Extract liquidus boundary from the phase column.
    # For each pressure row, find the lowest T where phase == 'liquid'.
    liquidus_log_p = []
    liquidus_log_t = []
    for ip in range(n_p):
        liquid_mask = phase_grid[ip, :] == 'liquid'
        if liquid_mask.any():
            first_liquid_idx = np.where(liquid_mask)[0][0]
            liquidus_log_p.append(unique_log_p[ip])
            liquidus_log_t.append(unique_log_t[first_liquid_idx])

    liquidus_log_p = np.array(liquidus_log_p)
    liquidus_log_t = np.array(liquidus_log_t)

    # Precompute grid spacing for O(1) bilinear interpolation.
    # Both PALEOS and Chabrier grids are log-uniform (constant dlogP, dlogT).
    dlog_p = (unique_log_p[-1] - unique_log_p[0]) / (n_p - 1) if n_p > 1 else 1.0
    dlog_t = (unique_log_t[-1] - unique_log_t[0]) / (n_t - 1) if n_t > 1 else 1.0

    # Verify grid is log-uniform (required for O(1) bilinear interpolation)
    if n_p > 2:
        p_spacings = np.diff(unique_log_p)
        if not np.allclose(p_spacings, dlog_p, rtol=1e-3):
            logger.warning(
                f'P grid is not log-uniform: spacing range '
                f'[{p_spacings.min():.6f}, {p_spacings.max():.6f}] '
                f'vs mean {dlog_p:.6f}. Bilinear interpolation may be inaccurate.'
            )
    if n_t > 2:
        t_spacings = np.diff(unique_log_t)
        if not np.allclose(t_spacings, dlog_t, rtol=1e-3):
            logger.warning(
                f'T grid is not log-uniform: spacing range '
                f'[{t_spacings.min():.6f}, {t_spacings.max():.6f}] '
                f'vs mean {dlog_t:.6f}. Bilinear interpolation may be inaccurate.'
            )

    return {
        'type': 'paleos_unified',
        'density_interp': density_interp,
        'nabla_ad_interp': nabla_ad_interp,
        'density_nn': density_nn,
        'nabla_ad_nn': nabla_ad_nn,
        'p_min': 10.0 ** unique_log_p[0],
        'p_max': 10.0 ** unique_log_p[-1],
        't_min': 10.0 ** unique_log_t[0],
        't_max': 10.0 ** unique_log_t[-1],
        'unique_log_p': unique_log_p,
        'unique_log_t': unique_log_t,
        'logt_valid_min': logt_valid_min,
        'logt_valid_max': logt_valid_max,
        'liquidus_log_p': liquidus_log_p,
        'liquidus_log_t': liquidus_log_t,
        'phase_grid': phase_grid,
        # Fast bilinear interpolation data
        'density_grid': density_grid,
        'nabla_ad_grid': nabla_ad_grid,
        'logp_min': unique_log_p[0],
        'logp_max': unique_log_p[-1],
        'logt_min': unique_log_t[0],
        'logt_max': unique_log_t[-1],
        'dlog_p': dlog_p,
        'dlog_t': dlog_t,
        'n_p': n_p,
        'n_t': n_t,
    }

_fast_bilinear(log_p, log_t, grid, cached)

O(1) bilinear interpolation on a log-uniform grid (scalar).

Optimized for the hot path: called ~2M times per structure solve. Uses direct floor indexing (no rounding, no max/min builtins).

Parameters:

Name Type Description Default
log_p float

log10 of pressure, already clamped to grid bounds.

required
log_t float

log10 of temperature, already clamped to valid range.

required
grid ndarray

2D array of values, shape (n_p, n_t).

required
cached dict

Cache entry with grid metadata.

required

Returns:

Type Description
float

Interpolated value. NaN if any corner is NaN.

Source code in src/zalmoxis/eos/interpolation.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def _fast_bilinear(log_p, log_t, grid, cached):
    """O(1) bilinear interpolation on a log-uniform grid (scalar).

    Optimized for the hot path: called ~2M times per structure solve.
    Uses direct floor indexing (no rounding, no max/min builtins).

    Parameters
    ----------
    log_p : float
        log10 of pressure, already clamped to grid bounds.
    log_t : float
        log10 of temperature, already clamped to valid range.
    grid : numpy.ndarray
        2D array of values, shape (n_p, n_t).
    cached : dict
        Cache entry with grid metadata.

    Returns
    -------
    float
        Interpolated value. NaN if any corner is NaN.
    """
    n_p_m2 = cached['n_p'] - 2
    n_t_m2 = cached['n_t'] - 2

    if n_p_m2 < 0 or n_t_m2 < 0:
        return grid[0, 0]

    # O(1) lower-bound index via floor division
    fp = (log_p - cached['logp_min']) / cached['dlog_p']
    ft = (log_t - cached['logt_min']) / cached['dlog_t']

    ip = int(fp)
    it = int(ft)

    # Clamp to valid range without max/min builtins
    if ip < 0:
        ip = 0
    elif ip > n_p_m2:
        ip = n_p_m2
    if it < 0:
        it = 0
    elif it > n_t_m2:
        it = n_t_m2

    # Fractional position within cell
    dp = fp - ip
    dt = ft - it
    if dp < 0.0:
        dp = 0.0
    elif dp > 1.0:
        dp = 1.0
    if dt < 0.0:
        dt = 0.0
    elif dt > 1.0:
        dt = 1.0

    # Bilinear blend
    omdp = 1.0 - dp
    omdt = 1.0 - dt
    return (
        grid[ip, it] * omdp * omdt
        + grid[ip, it + 1] * omdp * dt
        + grid[ip + 1, it] * dp * omdt
        + grid[ip + 1, it + 1] * dp * dt
    )

fast_bilinear_batch(log_p_arr, log_t_arr, grid, cached)

Vectorized O(1) bilinear interpolation on a log-uniform grid.

Batch version of _fast_bilinear for arrays of query points. Uses numpy vectorized operations instead of Python loops, giving 5-10x speedup over calling _fast_bilinear in a loop or using scipy RegularGridInterpolator for small-to-medium batches.

Parameters:

Name Type Description Default
log_p_arr ndarray

1D array of log10(pressure) values.

required
log_t_arr ndarray

1D array of log10(temperature) values (same length as log_p_arr).

required
grid ndarray

2D array of values, shape (n_p, n_t).

required
cached dict

Cache entry with grid metadata.

required

Returns:

Type Description
ndarray

Interpolated values. NaN where any corner is NaN.

Source code in src/zalmoxis/eos/interpolation.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def fast_bilinear_batch(log_p_arr, log_t_arr, grid, cached):
    """Vectorized O(1) bilinear interpolation on a log-uniform grid.

    Batch version of ``_fast_bilinear`` for arrays of query points.
    Uses numpy vectorized operations instead of Python loops, giving
    5-10x speedup over calling ``_fast_bilinear`` in a loop or using
    scipy ``RegularGridInterpolator`` for small-to-medium batches.

    Parameters
    ----------
    log_p_arr : numpy.ndarray
        1D array of log10(pressure) values.
    log_t_arr : numpy.ndarray
        1D array of log10(temperature) values (same length as log_p_arr).
    grid : numpy.ndarray
        2D array of values, shape (n_p, n_t).
    cached : dict
        Cache entry with grid metadata.

    Returns
    -------
    numpy.ndarray
        Interpolated values. NaN where any corner is NaN.
    """
    n_p = cached['n_p']
    n_t = cached['n_t']

    if n_p < 2 or n_t < 2:
        return np.full(len(log_p_arr), grid[0, 0])

    logp_min = cached['logp_min']
    logt_min = cached['logt_min']
    dlog_p = cached['dlog_p']
    dlog_t = cached['dlog_t']

    # O(1) index computation for log-uniform grids
    fp = (np.asarray(log_p_arr) - logp_min) / dlog_p
    ft = (np.asarray(log_t_arr) - logt_min) / dlog_t

    # Lower bounding indices (clamp to valid range)
    ip = np.clip(np.floor(fp).astype(int), 0, n_p - 2)
    it = np.clip(np.floor(ft).astype(int), 0, n_t - 2)

    # Fractional positions within cell
    ulp = cached['unique_log_p']
    ult = cached['unique_log_t']
    span_p = ulp[ip + 1] - ulp[ip]
    span_t = ult[it + 1] - ult[it]

    dp = np.where(span_p > 0, (np.asarray(log_p_arr) - ulp[ip]) / span_p, 0.0)
    dt = np.where(span_t > 0, (np.asarray(log_t_arr) - ult[it]) / span_t, 0.0)
    dp = np.clip(dp, 0.0, 1.0)
    dt = np.clip(dt, 0.0, 1.0)

    # Four corners
    v00 = grid[ip, it]
    v01 = grid[ip, it + 1]
    v10 = grid[ip + 1, it]
    v11 = grid[ip + 1, it + 1]

    # Bilinear blend
    return v00 * (1 - dp) * (1 - dt) + v01 * (1 - dp) * dt + v10 * dp * (1 - dt) + v11 * dp * dt

_paleos_clamp_temperature(log_p, log_t, cached)

Clamp log10(T) to the per-pressure valid range of a PALEOS table.

Uses O(1) index computation on the log-uniform pressure grid instead of np.interp (which does binary search on 1000+ elements). This is called millions of times in the scalar ODE path.

Parameters:

Name Type Description Default
log_p float

log10 of pressure in Pa (already clamped to table bounds).

required
log_t float

log10 of temperature in K.

required
cached dict

PALEOS cache entry.

required

Returns:

Type Description
float

Clamped log10(T).

bool

True if clamping was applied.

Source code in src/zalmoxis/eos/interpolation.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def _paleos_clamp_temperature(log_p, log_t, cached):
    """Clamp log10(T) to the per-pressure valid range of a PALEOS table.

    Uses O(1) index computation on the log-uniform pressure grid instead
    of np.interp (which does binary search on 1000+ elements). This is
    called millions of times in the scalar ODE path.

    Parameters
    ----------
    log_p : float
        log10 of pressure in Pa (already clamped to table bounds).
    log_t : float
        log10 of temperature in K.
    cached : dict
        PALEOS cache entry.

    Returns
    -------
    float
        Clamped log10(T).
    bool
        True if clamping was applied.
    """
    lt_min = cached['logt_valid_min']
    lt_max = cached['logt_valid_max']

    # O(1) path for log-uniform grids (has dlog_p metadata)
    if 'dlog_p' in cached:
        fp = (log_p - cached['logp_min']) / cached['dlog_p']
        n_p_m1 = cached['n_p'] - 2
        ip = int(fp)
        if ip < 0:
            ip = 0
        elif ip > n_p_m1:
            ip = n_p_m1
        frac = fp - ip
        if frac < 0.0:
            frac = 0.0
        elif frac > 1.0:
            frac = 1.0
        local_tmin = lt_min[ip] + frac * (lt_min[ip + 1] - lt_min[ip])
        local_tmax = lt_max[ip] + frac * (lt_max[ip + 1] - lt_max[ip])
    else:
        # Fallback for legacy/test cache dicts without grid metadata
        ulp = cached['unique_log_p']
        local_tmin = float(np.interp(log_p, ulp, lt_min))
        local_tmax = float(np.interp(log_p, ulp, lt_max))

    # Guard: NaN bounds near table edges
    if not (local_tmin == local_tmin and local_tmax == local_tmax):  # fast NaN check
        return log_t, False

    if log_t < local_tmin:
        return local_tmin, True
    elif log_t > local_tmax:
        return local_tmax, True
    return log_t, False

_ensure_unified_cache(eos_file, interpolation_functions)

Ensure a unified PALEOS table is loaded into the interpolation cache.

Tries loading from a binary pickle cache first (fast), then falls back to the text table (slow). Saves a pickle cache after the first text load.

Parameters:

Name Type Description Default
eos_file str

Path to the unified PALEOS table file.

required
interpolation_functions dict

Shared interpolation cache.

required

Returns:

Type Description
dict

The cache entry for this file.

Source code in src/zalmoxis/eos/interpolation.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def _ensure_unified_cache(eos_file, interpolation_functions):
    """Ensure a unified PALEOS table is loaded into the interpolation cache.

    Tries loading from a binary pickle cache first (fast), then falls back
    to the text table (slow). Saves a pickle cache after the first text load.

    Parameters
    ----------
    eos_file : str
        Path to the unified PALEOS table file.
    interpolation_functions : dict
        Shared interpolation cache.

    Returns
    -------
    dict
        The cache entry for this file.
    """
    if eos_file not in interpolation_functions:
        import pickle

        cache_path = eos_file.replace('.dat', '.pkl')
        try:
            with open(cache_path, 'rb') as f:
                interpolation_functions[eos_file] = pickle.load(f)
            logger.debug('Loaded PALEOS cache from %s', cache_path)
        except (FileNotFoundError, EOFError, pickle.UnpicklingError):
            logger.info('Loading PALEOS table from text: %s', eos_file)
            interpolation_functions[eos_file] = load_paleos_unified_table(eos_file)
            # Save binary cache for next time
            try:
                with open(cache_path, 'wb') as f:
                    pickle.dump(interpolation_functions[eos_file], f, protocol=4)
                logger.info('Saved PALEOS binary cache: %s', cache_path)
            except OSError:
                logger.debug('Could not save cache to %s', cache_path)

    return interpolation_functions[eos_file]