# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Various mathy helpers.
Minimal dependencies in this module.
"""
from __future__ import annotations
from math import ceil, floor, fmod, isfinite, log2
from typing import (
Any,
Iterator,
List,
Literal,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
import numpy as np
from affine import Affine
from numpy.polynomial.polynomial import polygrid2d, polyval2d
from .types import (
XY,
AnchorEnum,
Resolution,
SomeResolution,
SomeShape,
res_,
resxy_,
shape_,
xy_,
)
AffineX = TypeVar("AffineX", np.ndarray, Affine)
[docs]
def maybe_zero(x: float, tol: float) -> float:
"""Turn almost zeros to actual zeros."""
if abs(x) < tol:
return 0
return x
[docs]
def split_float(x: float) -> Tuple[float, float]:
"""
Split float number into whole and fractional parts.
Adding the two numbers back together should result in the original value.
Fractional part is always in the ``(-0.5, +0.5)`` interval, and whole part
is equivalent to ``round(x)``.
:param x: floating point number
:return: ``whole, fraction``
"""
if not isfinite(x):
return (x, 0)
x_part = fmod(x, 1.0)
x_whole = x - x_part
if x_part > 0.5:
x_part -= 1
x_whole += 1
elif x_part < -0.5:
x_part += 1
x_whole -= 1
return (x_whole, x_part)
[docs]
def maybe_int(x: float, tol: float) -> Union[int, float]:
"""
Turn almost ints to actual ints.
pass through other values unmodified.
"""
if not isfinite(x):
return x
x_whole, x_part = split_float(x)
if abs(x_part) < tol: # almost int
return int(x_whole)
return x
[docs]
def snap_scale(s: float, tol: float = 1e-6) -> float:
"""
Snap scale.
Snap scale to the nearest integer or simple fractions in the form ``1/<int>`` if within
tolerance.
:return: ``s`` if too far from snap
:return: snapped version of ``s`` when within ``tol`` of the integer scale.
"""
if abs(s) >= 1 - tol:
return maybe_int(s, tol)
# Check of s is 0
if abs(s) < tol:
return s
# Check for simple fractions
s_inv = 1 / s
s_inv_snapped = maybe_int(s_inv, tol)
if s_inv_snapped is s_inv:
return s
return 1 / s_inv_snapped
[docs]
def align_down(x: int, align: int) -> int:
"""
Align integer down.
:return:
``y`` such that ``y % align == 0`` and ``y <= x`` and ``(x - y) < align``
"""
return x - (x % align)
[docs]
def align_up(x: int, align: int) -> int:
"""
Align integer up.
:return:
``y`` such that ``y % align == 0`` and ``y >= x`` and ``(y - x) < align``
"""
return align_down(x + (align - 1), align)
def align_up_pow2(x: int) -> int:
"""
Align up to the nearest power of 2.
:return:
Smallest ``y`` such that ``y >= x`` and ``y == 2**n`` for some integer ``n``.
"""
if x <= 0:
return 1
return 2 ** int(ceil(log2(x)))
def align_down_pow2(x: int) -> int:
"""
Align down to the nearest power of 2.
:return:
Largest ``y`` such that ``y <= x`` and ``y == 2**n`` for some integer ``n``.
"""
y = align_up_pow2(x)
if y > x:
y = y // 2
return y
[docs]
def clamp(x, lo, up):
"""Clamp ``x`` to be ``lo <= x <= up``."""
assert lo <= up
return lo if x < lo else up if x > up else x
[docs]
def is_almost_int(x: float, tol: float) -> bool:
"""
Check if number is close enough to an integer.
:param x: number to check
:param tol: tolerance to use.
"""
if not isfinite(x):
return False
x = abs(fmod(x, 1))
if x > 0.5:
x = 1 - x
return x < tol
def _snap_edge_pos(x0: float, x1: float, res: float, tol: float) -> Tuple[float, int]:
assert res > 0
assert x1 >= x0
_x0 = floor(maybe_int(x0 / res, tol))
_x1 = ceil(maybe_int(x1 / res, tol))
nx = max(1, _x1 - _x0)
return _x0 * res, nx
def _snap_edge(x0: float, x1: float, res: float, tol: float) -> Tuple[float, int]:
assert x1 >= x0
if res > 0:
return _snap_edge_pos(x0, x1, res, tol)
_tx, nx = _snap_edge_pos(x0, x1, -res, tol)
tx = _tx + nx * (-res)
return tx, nx
def snap_grid(
x0: float, x1: float, res: float, off_pix: Optional[float] = 0, tol: float = 1e-6
) -> Tuple[float, int]:
"""
Compute grid snapping for single axis.
:param x0: In point ``x0 <= x1``
:param x1: Out point ``x0 <= x1``
:param res: Pixel size and direction (can be negative)
:param off_pix:
Pixel fraction to align to ``x=0``.
0 - edge aligned
0.5 - center aligned
None - don't snap
:return: ``tx, nx`` that defines 1-d grid, such that ``x0`` and ``x1`` are within edge pixels.
"""
assert (off_pix is None) or (0 <= off_pix < 1)
if off_pix is None:
if res > 0:
nx = ceil(maybe_int((x1 - x0) / res, tol))
return x0, max(1, nx)
nx = ceil(maybe_int((x1 - x0) / (-res), tol))
return x1, max(nx, 1)
off = off_pix * abs(res)
_tx, nx = _snap_edge(x0 - off, x1 - off, res, tol)
return _tx + off, nx
[docs]
def data_resolution_and_offset(
data, fallback_resolution: Optional[float] = None
) -> Tuple[float, float]:
"""
Compute resolution and offset from x/y axis data.
Only uses first two coordinate values, assumes that data is regularly
sampled.
:returns: ``(resolution, offset)``
"""
if not isinstance(data, np.ndarray):
data = data.values
if data.size < 2:
if data.size < 1:
raise ValueError("Can't calculate resolution for empty data")
if fallback_resolution is None:
raise ValueError("Can't calculate resolution with data size < 2")
res = fallback_resolution
off = data[0] - 0.5 * res
return res, float(off)
x0, x1 = map(float, data[:2])
# x0 = 0*s + t + s/2
# x1 = 1*s + t + s/2
####################
# s = x1 - x0
# 2*t = 3*x0 - x1
return x1 - x0, (3 * x0 - x1) / 2
[docs]
def affine_from_axis(
xx, yy, fallback_resolution: Optional[SomeResolution] = None
) -> Affine:
"""
Compute Affine transform from axis.
Transform direction is from pixel coordinates to CRS units of ``X/Y`` axis.
:param xx:
``X`` axis coordinates
:param yy:
``Y`` axis coordinates
:param fallback_resolution:
Resolution to assume for single element axis.
.. code-block:: text
(0,0) in pixel space is defined as top left corner of the top left pixel
|
V 0 1
+---+---+
0 | | |
+---+---+
1 | | |
+---+---+
Only uses first two coordinate values, assumes that data is regularly
sampled.
:raises:
:py:class:`ValueError` when any axis is empty.
:raises:
:py:class:`ValueError` when any axis has single value and fallback
resolution was not supplied.
"""
frx: Optional[float] = None
fry: Optional[float] = None
if fallback_resolution is not None:
frx, fry = res_(fallback_resolution).xy
xres, xoff = data_resolution_and_offset(xx, frx)
yres, yoff = data_resolution_and_offset(yy, fry)
return Affine(xres, 0.0, xoff, 0.0, yres, yoff)
[docs]
def apply_affine(
A: Affine, x: np.ndarray, y: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Broadcast ``A*(x_i, y_i)`` across all elements of ``x/y``.
Arrays could be in any shape (usually 2d image).
:param A: Affine transform to apply
:param x: x coordinates (any shape)
:param y: y coordinates (same shape as ``x``)
:returns: Transformed coordinate as two arrays of the same shape as input ``(x', y')``
"""
shape = x.shape
A = np.asarray(A).reshape(3, 3)
t = A[:2, -1].reshape((2, 1))
A = A[:2, :2]
x, y = A @ np.vstack([x.ravel(), y.ravel()]) + t
x, y = (a.reshape(shape) for a in (x, y))
return (x, y)
[docs]
def split_translation(t: XY[float]) -> Tuple[XY[float], XY[float]]:
"""
Split translation into pixel aligned and sub-pixel parts.
Subpixel translation is guaranteed to be in ``[-0.5, +0.5]`` range.
.. code-block:: python
x + t = x + t_whole + t_subpix
:param t: Translation as :py:class:`~odc.geo.XY`
:returns: ``(t_whole, t_subpix)``
"""
_tt = t.map(split_float)
whole = _tt.map(lambda x: x[0])
part = _tt.map(lambda x: x[1])
return whole, part
[docs]
def is_affine_st(A: Affine, tol: float = 1e-10) -> bool:
"""
Check if transfrom is pure scale and translation.
:return: ``True`` if Affine transform has scale and translation components only
:return: ``False`` if there is non-zero rotation or skew
"""
(_, wx, _, wy, _, _, *_) = A
return abs(wx) < tol and abs(wy) < tol
def approx_equal_affine(a: Affine, b: Affine, tol: float = 1e-6) -> bool:
"""
Check if two affine matrices are approximately equal.
"""
sx, z1, tx, z2, sy, ty = map(lambda v: maybe_int(v, tol), (~a * b)[:6])
return (sx, z1, tx, z2, sy, ty) == (1, 0, 0, 0, 1, 0)
[docs]
def snap_affine(
A: Affine, ttol: float = 1e-3, stol: float = 1e-6, tol: float = 1e-8
) -> Affine:
"""
Snap scale and translation parts to integer when close enough.
When scale is less than 1 then attempt snapping to ``1/<int>``.
If input has rotation/shear component then return unchanged.
:param A: Affine matrix
:param ttol: translation tolerance, defaults to 1e-3
:param stol: scale tolerance, defaults to 1e-6
:param tol: rotation tolerance, defaults to 1e-10
:return: Adjusted Affine matrix.
:return: Input Affine matrix when rotation/shear is present.
"""
sx, wx, tx, wy, sy, ty, *_ = A
# has rotation component
if abs(wx) > tol or abs(wy) > tol:
return A
sx_ = snap_scale(sx, stol)
sy_ = snap_scale(sy, stol)
tx_ = maybe_int(tx, ttol)
ty_ = maybe_int(ty, ttol)
return Affine(sx_, 0, tx_, 0, sy_, ty_)
def decompose_rws(A: AffineX) -> Tuple[AffineX, AffineX, AffineX]:
"""
Compute decomposition Affine matrix sans translation into Rotation, Shear and Scale.
Find matrices ``R,W,S`` such that ``A = R W S`` and
.. code-block:
R [ca -sa] W [1, w] S [sx, 0]
[sa ca] [0, 1] [ 0, sy]
.. note:
There are ambiguities for negative scales.
* ``R(90)*S(1,1) == R(-90)*S(-1,-1)``
* ``(R*(-I))*((-I)*S) == R*S``
:return: Rotation, Shear, Scale ``2x2`` matrices
"""
# pylint: disable=too-many-locals
if isinstance(A, Affine):
def to_affine(m, t=(0, 0)):
a, b, d, e = m.ravel()
c, f = t
return Affine(a, b, c, d, e, f)
(a, b, c, d, e, f, *_) = A
R, W, S = decompose_rws(np.asarray([[a, b], [d, e]], dtype="float64"))
return to_affine(R, (c, f)), to_affine(W), to_affine(S)
assert A.shape == (2, 2)
WS = np.linalg.cholesky(A.T @ A).T
R = A @ np.linalg.inv(WS)
if np.linalg.det(R) < 0:
R[:, -1] *= -1
WS[-1, :] *= -1
ss = np.diag(WS)
S = np.diag(ss)
W = WS @ np.diag(1.0 / ss)
return R, W, S
def stack_xy(pts: Sequence[XY[float]]) -> np.ndarray:
"""Turn into an ``Nx2`` ndarray of floats in X,Y order."""
return np.vstack([pt.xy for pt in pts])
def unstack_xy(pts: np.ndarray) -> List[XY[float]]:
"""Turn ``Nx2`` array in X,Y order into a list of XY points."""
assert pts.ndim == 2 and pts.shape[1] == 2
return [xy_(pt) for pt in pts]
[docs]
def norm_xy(
pts: np.ndarray, out: Optional[np.ndarray] = None
) -> Tuple[np.ndarray, Affine]:
"""
Normalize ``Nx2`` points.
Scale and translate such that mean is at 0 and mean distance from 0 is sqrt(2).
:returns: Normalized array and affine matrix of the normalization.
"""
assert pts.ndim == 2 and pts.shape[1] == 2
assert out is None or out.shape == pts.shape
_mean = pts.mean(axis=0)
XX = np.subtract(pts, _mean, out=out)
sx = (((XX**2).sum(axis=1) * 0.5) ** -0.5).mean()
XX *= sx
tx, ty = -_mean * sx
A = Affine(sx, 0, tx, 0, sx, ty)
return XX, A
[docs]
def affine_from_pts(X: Sequence[XY[float]], Y: Sequence[XY[float]]) -> Affine:
"""
Given points ``X,Y`` compute ``A``, such that: ``Y = A*X``.
Needs at least 3 points.
"""
assert len(X) == len(Y)
assert len(X) >= 3
n = len(X)
YY = stack_xy(Y)
XX = np.ones((n, 3), dtype="float64")
for i, pt in enumerate(X):
XX[i, :2] = pt.xy
mm, *_ = np.linalg.lstsq(XX, YY, rcond=-1)
a, d, b, e, c, f = mm.ravel()
return Affine(a, b, c, d, e, f)
def resolution_from_affine(A: Affine) -> Resolution:
"""
Compute resolution from Affine matrix.
Deals with rotated case when needed.
"""
if is_affine_st(A):
rx, _, _, _, ry, *_ = A
return resxy_(rx, ry)
_, _, A_ = decompose_rws(A)
rx, _, _, _, ry, *_ = A_
return resxy_(rx, ry)
[docs]
def edge_index(shape: SomeShape, closed: bool = False) -> Iterator[Tuple[int, int]]:
"""
Like ``ndindex`` in numpy but for edge locations and limited to 2d.
Returns a sequence of indexes along the edge of a 2d array of a given shape.
Order is fixed, starting from ``(0, 0), (0, 1)...``. See example below
.. code-block::
# for shape=(10, 8)
(0, 0), (0, 1) ... (0, 7),
(1, 7), (2, 7) ... (9, 7),
(9, 6), (9, 5) ... (9, 0),
(8, 0), (7, 0) ... (1, 0),
(0, 0) # Back to (0, 0) if closed=True
"""
nx, ny = shape_(shape).xy
ix, iy = 0, 0
for ix in range(nx):
yield (0, ix)
for iy in range(1, ny):
yield (iy, ix)
for ix in range(ix - 1, -1, -1):
yield (iy, ix)
for iy in range(iy - 1, 0, -1):
yield (iy, ix)
if closed:
yield (0, 0)
[docs]
def quasi_random_r2(
n: int,
shape: Optional[SomeShape] = None,
offset: int = 0,
) -> np.ndarray:
"""
Generate quasi-random set of points.
Returns quasi-random points in ``[0, 1)`` range. If ``shape=(ny, nx)`` is
supplied, points are scaled to ``[0, nx), [0, ny)`` range.
:param n: Number of points to generate
:param offset: Generate from that offset in the sequence
:returns: ``nx2`` numpy array
References: http://extremelearning.com.au/unreasonable-effectiveness-of-quasirandom-sequences/
"""
idx = np.arange(offset, offset + n, dtype="float32")
aa = (0.7548776662466927, 0.5698402909980532)
xy = np.fmod(np.outer(idx, aa), 1)
if shape is not None:
nx, ny = shape_(shape).wh
xy[:, 0] *= nx
xy[:, 1] *= ny
return xy
[docs]
class Bin1D:
"""
Class for translating continous coordinates to bin index.
Binning is defined using following parameters:
:param sz: Bin size (positive floating point number)
:param origin: Location of the left edge of bin ``0``
:param direction: Direction of the bin index ``+1|-1``
"""
__slots__ = ("sz", "origin", "direction")
[docs]
def __init__(self, sz: float, origin: float = 0.0, direction: Literal[1, -1] = 1):
"""
Construct :py:class:`~odc.geo.math.Bin1D` object.
:param sz:
Size of each bin, must be positive
:param origin:
Location of the left edge of bin ``0``, defaults to ``0.0``
:param direction:
Default is to increment bin index left to right, supply ``-1`` to go the other way
"""
assert direction in (-1, 1)
assert sz > 0
self.sz = sz
self.origin = origin
self.direction = direction
def __getitem__(self, idx: int) -> Tuple[float, float]:
"""Convert index to interval."""
_x = idx * self.sz * self.direction + self.origin
return _x, _x + self.sz
[docs]
def bin(self, x: float) -> int:
"""Lookup bin index that ``x`` falls into."""
ix = floor((x - self.origin) / self.sz)
return int(self.direction * ix)
def __eq__(self, other) -> bool:
if not isinstance(other, Bin1D):
return False
return (
self.sz == other.sz
and self.origin == other.origin
and self.direction == other.direction
)
# pylint: disable=redefined-builtin
[docs]
@staticmethod
def from_sample_bin(
idx: int, bin: Tuple[float, float], direction: Literal[1, -1] = 1
) -> "Bin1D":
"""
Construct :py:class:`~odc.geo.math.Bin1D` from a sample.
:param idx:
Index of a sample bin
:param bin:
``x0, x1`` bin edges
:param direction:
Default is to increment bin index left to right, supply ``-1`` to go the other way
"""
x0, x1 = bin
assert x0 < x1
sz = x1 - x0
origin = x0 - sz * idx * direction
return Bin1D(sz, origin, direction=direction)
[docs]
class Poly2d:
"""
Wrapper around polyval2d with input normalization.
"""
[docs]
def __init__(self, cc: np.ndarray, A: Affine) -> None:
assert cc.shape in [(3, 3, 2), (2, 2, 2)]
tol = 1e-6
self._cc = cc
self._A = A
self._safe_to_grid = False
sx, zx, tx, zy, sy, ty, *_ = A
if abs(zx) < tol and abs(zy) < tol:
self._norm = lambda x, y: (np.polyval([sx, tx], x), np.polyval([sy, ty], y))
self._safe_to_grid = True
else:
self._norm = lambda x, y: A * (x, y)
def __call__(self, x: Any, y: Any = None) -> Any:
"""
Evaluate at points (x, y).
When y is None, x is assumed to be Nx2 array.
"""
if y is None:
return self.__call__(x[..., 0], x[..., 1]).T
x, y = self._norm(x, y)
return polyval2d(x, y, self._cc)
[docs]
def grid2d(self, x: Any, y: Any) -> Any:
"""
Evaluate on the Cartesian product of x and y.
"""
assert self._safe_to_grid
x, y = self._norm(x, y)
return polygrid2d(x, y, self._cc)
[docs]
@staticmethod
def fit(aa: np.ndarray, bb: np.ndarray) -> "Poly2d":
"""
Fit 2d polynomial that minimizes ``poly(aa) - bb``.
Where ``aa``,``bb`` are point correspondences of ``Nx2`` shape.
"""
assert aa.shape[1] == 2
assert aa.shape == bb.shape
N = aa.shape[0]
if N < 3:
raise ValueError(f"Need at least 3 points, got {N}")
aa_, Ain = norm_xy(aa)
bb_, Ab = norm_xy(bb)
if N >= 9:
return Poly2d._fit9(aa_, Ain, bb_, Ab)
if N >= 4:
return Poly2d._fit4(aa_, Ain, bb_, Ab)
return Poly2d._fit3(aa_, Ain, bb_, Ab)
@staticmethod
def _fit9(aa: np.ndarray, Ain: Affine, bb: np.ndarray, Ab: Affine) -> "Poly2d":
N = aa.shape[0]
assert N >= 9
x, y = aa.T
AA = np.empty((N, 9), dtype="float64")
# 1, y, y^2
# x, x*y, x*y^2
# x^2, x^2*y, x^2*y^2
AA[:, 0] = 1
AA[:, 1] = y
AA[:, 2] = y * y
AA[:, 3] = x
AA[:, 4] = x * y
AA[:, 5] = AA[:, 4] * y
AA[:, 6] = x * x
AA[:, 7] = AA[:, 6] * y
AA[:, 8] = AA[:, 7] * y
cc, *_ = np.linalg.lstsq(AA, bb, rcond=-1)
# denorm output side, assumes `sx==sy`
s, _, tx, _, _, ty, *_ = ~Ab
cc = cc * s
cc[0, :2] += (tx, ty)
cc = cc.reshape(3, 3, 2)
return Poly2d(cc, Ain)
@staticmethod
def _fit4(aa: np.ndarray, Ain: Affine, bb: np.ndarray, Ab: Affine) -> "Poly2d":
N = aa.shape[0]
assert N >= 4
x, y = aa.T
AA = np.empty((N, 4), dtype="float64")
# 1, y
# x, x*y
AA[:, 0] = 1
AA[:, 1] = y
AA[:, 2] = x
AA[:, 3] = x * y
cc, *_ = np.linalg.lstsq(AA, bb, rcond=-1)
assert cc.shape == (4, 2)
# denorm output side, assumes `sx==sy`
s, _, tx, _, _, ty, *_ = ~Ab
cc = cc * s
cc[0, :2] += (tx, ty)
cc = cc.reshape(2, 2, 2)
return Poly2d(cc, Ain)
@staticmethod
def _fit3(aa: np.ndarray, Ain: Affine, bb: np.ndarray, Ab: Affine) -> "Poly2d":
N = aa.shape[0]
assert N >= 3
x, y = aa.T
AA = np.empty((N, 3), dtype="float64")
# 1, y
# x, _
# 1, y, x
AA[:, 0] = 1
AA[:, 1] = y
AA[:, 2] = x
cc, *_ = np.linalg.lstsq(AA, bb, rcond=-1)
assert cc.shape == (3, 2)
# denorm output side, assumes `sx==sy`
s, _, tx, _, _, ty, *_ = ~Ab
cc = cc * s
cc[0, :2] += (tx, ty)
cc = np.vstack([cc, np.asarray([0, 0])]).reshape(2, 2, 2)
return Poly2d(cc, Ain)
def extract_anchor(pix2wld: Affine, tol: float = 1e-3) -> AnchorEnum | XY[float]:
def _anchor(px: float, tol: float) -> float:
_, x = split_float(px) # x in (-0.5, +0.5)
x = (1 + x) if x < 0 else x # x in [0, 1)
x = snap_scale(maybe_zero(x, tol), tol)
return 0 if x >= 1 else x
anchor = tuple(_anchor(px, tol) for px in (~pix2wld) * (0, 0))
if anchor == (0, 0):
return AnchorEnum.EDGE
if anchor == (0.5, 0.5):
return AnchorEnum.CENTER
return xy_(anchor)