# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import math
from dataclasses import dataclass
from typing import Callable, Literal, Optional, Protocol, Sequence, Tuple, Union
import numpy as np
from affine import Affine
from .crs import SomeCRS
from .gcp import GCPGeoBox
from .geobox import GeoBox, GeoboxAnchor, GeoBoxBase, gbox_boundary
from .math import (
affine_from_pts,
decompose_rws,
is_affine_st,
is_almost_int,
maybe_int,
snap_affine,
stack_xy,
unstack_xy,
)
from .roi import (
NormalizedROI,
NormalizedSlice,
roi_boundary,
roi_center,
roi_from_points,
roi_is_empty,
scaled_up_roi,
)
from .types import XY, SomeResolution, SomeShape, res_, shape_, xy_
class PointTransform(Protocol):
"""
Invertible point transform.
"""
def __call__(self, pts: Sequence[XY[float]]) -> Sequence[XY[float]]: ...
@property
def back(self) -> "PointTransform": ...
@property
def linear(self) -> Optional[Affine]: ...
class LinearPointTransform:
"""
Point transform within the same projection.
"""
def __init__(self, A: Affine, back: Optional["LinearPointTransform"] = None):
self.A = A
self._back = back
@property
def linear(self) -> Optional[Affine]:
return self.A
@property
def back(self) -> "LinearPointTransform":
if self._back is not None:
return self._back
back = LinearPointTransform(~self.A, self)
self._back = back
return back
def __call__(self, pts: Sequence[XY[float]]) -> Sequence[XY[float]]:
A = self.A
return [xy_(A * pt.xy) for pt in pts]
def __repr__(self) -> str:
return f"LinearPointTransform(\n {self.A!r})"
class GbxPointTransform:
"""
Point Transform between two pixel planes.
Maps pixel coordinates from one geo referenced image to another.
1. Input is source pixel coordinate
2. Compute coordinate in CRS units using pix2wld transform of the source image
3. Project point to the CRS of the destination image
4. Compute destination image pixel coordinate by using wld2pix mapping of the
destination image
"""
def __init__(
self,
src: GeoBoxBase,
dst: GeoBoxBase,
back: Optional["GbxPointTransform"] = None,
):
assert src.crs is not None and dst.crs is not None
self._src = src
self._dst = dst
self._back = back
self._tr = src.crs.transformer_to_crs(dst.crs)
self._clamps: Optional[Tuple[Tuple[float, float], Tuple[float, float]]] = None
if src.crs.geographic:
self._clamps = ((-180, 180), (-90, 90))
@property
def back(self) -> "GbxPointTransform":
if self._back is not None:
return self._back
back = GbxPointTransform(self._dst, self._src, self)
self._back = back
return back
@property
def linear(self) -> Optional[Affine]:
return None
def __call__(self, pts: Sequence[XY[float]]) -> Sequence[XY[float]]:
# pix_src -> X -> X' -> pix_dst
# inv(dst.A)*to_crs(src.A*pt)
xx, yy = np.asarray([pt.xy for pt in pts]).T
xx, yy = self._src.pix2wld(xx, yy)
if self._clamps is not None:
# for global datasets in 4326 pixel edges sometimes reach just outside
# of the valid region due to rounding errors when creating tiff files
# those coordinates can then not be converted properly to destintation crs
range_x, range_y = self._clamps
xx = np.clip(xx, *range_x)
yy = np.clip(yy, *range_y)
xx, yy = self._dst.wld2pix(*self._tr(xx, yy))
return [xy_(x, y) for x, y in zip(xx, yy)]
def __repr__(self) -> str:
return f"GbxPointTransform({self._src!r}, {self._dst!r})"
[docs]
@dataclass
class ReprojectInfo:
"""
Describes computed data loading parameters.
For scale direction is: "scale > 1 --> shrink src to fit dst".
"""
roi_src: NormalizedROI
"""Section of the source image to load."""
roi_dst: NormalizedROI
"""Section of the destination image to update."""
paste_ok: bool
"""
When ``True`` source can be pasted into destination directly.
* Must have same projection
* Must have same pixel size, or same pixel size after shrinking
source with integer scaling
* Sub-pixel translation between the source and the destination images must be lower
than the requested tolerance
"""
read_shrink: int
"""
Highest allowed integer shrink factor on the read side.
Used to pick overview level for reading. A value of ``3`` means you can
shrink every ``3x3`` pixel block of the source image down to a single pixel,
and still have higher resolution than requested.
"""
scale: float
"""Scale change as a single number. (the min of scale2)"""
scale2: XY[float]
"""Full 2D Scale change as an XY."""
transform: PointTransform
"""Mapping from src pixels to destination pixels."""
def get_scale_from_linear_transform(A: Affine) -> XY[float]:
"""
Given a linear transform compute scale change.
1. Y = A*X + t
2. Extract scale components of A
Returns (sx, sy), where sx > 0, sy > 0
"""
_, _, S = decompose_rws(A)
return xy_(abs(S.a), abs(S.e))
def get_scale_at_point(
pt: XY[float], tr: PointTransform, r: Optional[float] = None
) -> XY[float]:
"""
Given an arbitrary locally linear transform estimate scale change around a point.
1. Approximate ``Y = tr(X)`` as ``Y = A*X + t`` in the neighbourhood of pt, for ``X,Y in R2``
2. Extract scale components of ``A``
:param pt: estimate transform around this point
:param r: radius around the point (default is 1)
:param tr: Point transforming function ``Sequence[XY[float]] -> Sequence[XY[float]]``
:return: ``(sx, sy)`` where ``sx > 0, sy > 0``
"""
pts0 = [(0, 0), (-1, 0), (0, -1), (1, 0), (0, 1)]
x0, y0 = pt.xy
if r is None:
XX = [xy_(float(x + x0), float(y + y0)) for x, y in pts0]
else:
XX = [xy_(float(x * r + x0), float(y * r + y0)) for x, y in pts0]
YY = tr(XX)
A = affine_from_pts(XX, YY)
return get_scale_from_linear_transform(A)
def _same_crs_pix_transform(src: GeoBox, dst: GeoBox) -> LinearPointTransform:
assert src.crs == dst.crs
_fwd = (~dst.transform) * src.transform # src -> dst
return LinearPointTransform(_fwd)
def compute_axis_overlap(
Ns: int, Nd: int, s: float, t: float
) -> Tuple[NormalizedSlice, NormalizedSlice]:
"""
s, t define linear transform from destination coordinate space to source
>> x_s = s * x_d + t
Ns -- number of pixels along some dimension of source image: (0, Ns)
Nd -- same as Ns but for destination image
:returns: (slice in the source image,
slice in the destination image)
"""
needs_flip = s < 0
if needs_flip:
# change s, t to map into flipped src, i.e. src[::-1]
s, t = -s, Ns - t
assert s > 0
# x_d = (x_s - t)/s => 1/s * x_s + t*(-1/s)
#
# x_d = s_ * x_s + t_
s_ = 1.0 / s
t_ = -t * s_
if t < 0:
# |<------- ... D
# |<--- ... S
_in = (0, min(math.floor(t_), Nd))
else:
# |<--... D
# |<---------... S
_in = (min(math.floor(t), Ns), 0)
a = math.ceil(Nd * s + t)
if a <= Ns:
# ...----->| D
# ...-------->| S
_out = (max(a, 0), Nd)
else:
# ...-------->| D
# ...----->| S
_out = (Ns, max(0, math.ceil(Ns * s_ + t_)))
src, dst = (slice(_in[i], _out[i]) for i in range(2))
if needs_flip:
# remap src from flipped space to normal
src = slice(Ns - src.stop, Ns - src.start) # type: ignore
return (src, dst)
def box_overlap(
src_shape: SomeShape, dst_shape: SomeShape, ST: Affine
) -> Tuple[NormalizedROI, NormalizedROI]:
"""
Compute overlap between two image planes.
Given two image planes whose coordinate systems are related via scale and
translation only, find overlapping regions within both.
:param src_shape:
Shape of source image plane
:param dst_shape:
Shape of destination image plane
:param ST:
Affine transform with only scale/translation, direction is: ``Xsrc = ST*Xdst``
:returns: ``(src_roi, dst_roi)``
"""
src_shape = shape_(src_shape)
dst_shape = shape_(dst_shape)
(sx, _, tx, _, sy, ty, *_) = ST
s0, d0 = compute_axis_overlap(src_shape.y, dst_shape.y, sy, ty)
s1, d1 = compute_axis_overlap(src_shape.x, dst_shape.x, sx, tx)
return (s0, s1), (d0, d1)
def native_pix_transform(src: GeoBoxBase, dst: GeoBoxBase) -> PointTransform:
"""
direction: from src to dst
.back: goes the other way
.linear: None|Affine linear transform src->dst if transform is linear (i.e. same CRS)
"""
# Special case CRS_in == CRS_out
if isinstance(src, GeoBox) and isinstance(dst, GeoBox) and src.crs == dst.crs:
return _same_crs_pix_transform(src, dst)
return GbxPointTransform(src, dst)
def _pick_read_scale(scale: float, tol: float = 1e-3) -> int:
assert scale > 0
# scale < 1 --> 1
# Else: scale down to nearest integer, unless we can scale up by less than tol
#
# 2.999999 -> 3
# 2.8 -> 2
# 0.3 -> 1
if scale < 1:
return 1
# if close to int from below snap to it
scale = maybe_int(scale, tol)
# otherwise snap to nearest integer below
return int(scale)
def _can_paste(
A: Affine, stol: float = 1e-3, ttol: float = 1e-2
) -> Tuple[bool, Optional[str]]:
"""
Check if can read (possibly with scale) and paste, or do we need to read then reproject.
:param A: Coordinate mapping from dst to src ``X_src = A*X_dst``
:returns: (True, None) if one can just read and paste
:returns: (False, Reason) if pasting is not possible, so need to reproject after reading
"""
if not is_affine_st(A):
return (False, "has rotation or shear")
sx, sy = get_scale_from_linear_transform(A).xy
scale = min(sx, sy)
if not is_almost_int(scale, stol): # non-integer scaling
return False, "non-integer scale"
read_scale = _pick_read_scale(scale)
# A_ maps coords from `dst` to `src.overview[scale]`
A_ = Affine.scale(1 / read_scale, 1 / read_scale) * A
(sx, _, tx, _, sy, ty, *_) = A_ # tx, ty are in dst pixel space
# Expect identity for scale change
if any(abs(abs(s) - 1) > stol for s in (sx, sy)): # not equal scaling across axis?
return False, "sx!=sy, probably"
# Check if sub-pixel translation within bounds
if not all(is_almost_int(t, ttol) for t in (tx, ty)):
return False, "sub-pixel translation"
return True, None
def _relative_rois(
src: GeoBoxBase,
dst: GeoBoxBase,
tr: PointTransform,
pts_per_side: int,
padding: int,
align: Optional[int],
):
_XY = tr.back(unstack_xy(gbox_boundary(dst, pts_per_side)))
roi_src = roi_from_points(stack_xy(_XY), src.shape, padding, align=align)
if roi_is_empty(roi_src):
return (roi_src, np.s_[0:0, 0:0])
# project src roi back into dst and compute roi from that
xy = tr(unstack_xy(roi_boundary(roi_src, pts_per_side)))
# `padding=0` is to avoid adding padding twice
roi_dst = roi_from_points(stack_xy(xy), dst.shape, padding=0)
return (roi_src, roi_dst)
[docs]
def compute_reproject_roi(
src: GeoBoxBase,
dst: GeoBoxBase,
ttol: float = 0.05,
stol: float = 1e-3,
padding: Optional[int] = None,
align: Optional[int] = None,
) -> ReprojectInfo:
"""
Compute reprojection information.
Given two GeoBoxes find the region within the source GeoBox that overlaps
with the destination GeoBox, and also compute the scale factor (>1 means
shrink). Scale is chosen such that if you apply it to the source image
before reprojecting, then reproject will have roughly no scale component.
So we are breaking up reprojection into two stages:
1. Scale in the native pixel CRS
2. Reprojection (possibly non-linear with CRS change)
.. code-block:: text
- src[roi] -> scale -> reproject -> dst (using native pixels)
- src(scale)[roi(scale)] -> reproject -> dst (using overview image)
Here ``roi`` is "minimal", padding is configurable though, so you only read what you need.
Also scale can be used to pick the right kind of overview level to read.
Applying reprojection in two steps allows us to use pre-computed overviews,
particularly useful when shrink factor is large. But even for data sources
without overviews there are advantages for shrinking source image before
applying reprojection: mainly quality of the output (reduces aliasing for
large shrink factors), improved efficiency of the computation is likely as
well.
Also compute and return ROI of the dst geobox that is affected by src.
If padding is ``None`` "appropriate" padding will be used depending on the
transform between ``src<>dst``:
* No padding beyond sub-pixel alignment if Scale+Translation
* 1 pixel source padding in all other cases
:param src:
Geobox of the source image
:param dst:
Geobox of the destination image
:param padding:
Optional padding in source pixels
:param align:
Optional pixel alignment in pixels, used on both source and destination.
:param tol:
Sub-pixel translation tolerance as pixel fraction.
:param stol:
Scale tolerance for pasting
:returns:
An instance of ``ReprojectInfo`` class.
"""
# pylint: disable=too-many-locals
pts_per_side = 5
tr = native_pix_transform(src, dst)
if tr.linear is None:
padding = 1 if padding is None else padding
roi_src, roi_dst = _relative_rois(
src, dst, tr, pts_per_side=pts_per_side, padding=padding, align=align
)
if not roi_is_empty(roi_dst):
center_pt = xy_(roi_center(roi_dst)[::-1])
scale2 = get_scale_at_point(center_pt, tr.back)
scale = min(scale2.xy)
read_shrink = _pick_read_scale(scale)
else:
scale = 0
scale2 = XY(x=0, y=0)
read_shrink = 1
return ReprojectInfo(
roi_src=roi_src,
roi_dst=roi_dst,
scale=scale,
scale2=scale2,
paste_ok=False,
read_shrink=read_shrink,
transform=tr,
)
# Same projection case
#
A = tr.back.linear # dst->src
scale2 = get_scale_from_linear_transform(A)
scale = min(scale2.xy)
read_shrink = _pick_read_scale(scale)
paste_ok = False
tight_ok = align in (None, 0) and padding in (0, None)
if tight_ok:
paste_ok, _ = _can_paste(A, ttol=ttol, stol=stol)
if paste_ok:
if read_shrink == 1:
A_ = snap_affine(A, ttol=ttol, stol=stol)
roi_src, roi_dst = box_overlap(src.shape, dst.shape, A_)
else:
# compute overlap in scaled down image, then upscale source overlap
assert isinstance(src, GeoBox)
_src = src.zoom_out(read_shrink)
A_ = snap_affine(Affine.scale(1 / read_shrink) * A, ttol=ttol, stol=stol)
roi_src, roi_dst = box_overlap(_src.shape, dst.shape, A_)
roi_src = scaled_up_roi(roi_src, read_shrink)
else:
padding = 1 if padding is None else padding
roi_src, roi_dst = _relative_rois(
src, dst, tr, pts_per_side=2, padding=padding, align=align
)
return ReprojectInfo(
roi_src=roi_src,
roi_dst=roi_dst,
scale=scale,
scale2=scale2,
paste_ok=paste_ok,
read_shrink=read_shrink,
transform=tr,
)
[docs]
def compute_output_geobox(
gbox: Union[GeoBox, GCPGeoBox],
crs: SomeCRS,
*,
resolution: Union[SomeResolution, Literal["auto", "fit", "same"]] = "auto",
shape: Union[SomeShape, int, None] = None,
tight: bool = False,
anchor: GeoboxAnchor = "default",
tol: float = 0.01,
round_resolution: Union[None, bool, Callable[[float, str], float]] = None,
) -> GeoBox:
"""
Compute output ``GeoBox``.
Find best fitting, axis aligned GeoBox in a different coordinate reference
given source ``GeoBox`` on input.
:param gbox:
Source geobox.
:param crs:
Desired CRS of the output
:param resolution:
* "same" use exactly the same resolution as src
* "fit" use center pixel to determine scale change between the two
* | "auto" is to use the same resolution on the output if CRS units are
| the same between the source and destination and otherwise use "fit"
* Ignored if ``shape=`` is supplied
* Else resolution in the units of the output crs
:param shape:
Span that many pixels, if it's a single number then span that many pixels
along the longest dimension, other dimension will be computed to maintain
roughly square pixels. Takes precedence over ``resolution=`` parameter.
:param tight:
By default output pixel grid is adjusted to align pixel edges to X/Y axis,
suppling ``tight=True`` produces unaligned geobox on the output.
:param anchor:
Control pixel snapping, default is to snap pixel edge to ``X=0,Y=0``.
Ignored when ``tight=True`` is supplied.
:param tol:
Fraction of the output pixel that can be ignored, defaults to 1/100.
Bounding box of the output geobox is allowed to be smaller by that amount
than transformed footprint of the original.
:param round_resolution:
``round_resolution(res: float, units: str) -> float``
:return:
Similar resolution, axis aligned geobox that fully encloses source one
but in a different projection.
"""
# pylint: disable=too-many-locals,too-many-arguments
src_crs = gbox.crs
assert src_crs is not None
# figure out "true" dts_crs (handles "utm" -> actual CRS)
bbox = gbox.footprint(crs, buffer=0.9, npoints=100).boundingbox
dst_crs = bbox.crs
assert dst_crs is not None
if (
dst_crs == src_crs
and resolution in ("auto", "same")
and shape is None
and anchor == "default"
and isinstance(gbox, GeoBox)
):
return gbox
same_units = src_crs.units == dst_crs.units
if shape is not None:
res: Optional[SomeResolution] = None
elif resolution == "same":
res = gbox.resolution
elif resolution == "auto" and same_units:
res = gbox.resolution
elif resolution in ("fit", "auto"):
# get initial resolution by computing 1x1 bounding box of the center pixel
#
cp = gbox.center_pixel
cp_bbox = cp.extent.to_crs(dst_crs).boundingbox
dst_ = GeoBox.from_bbox(cp_bbox, dst_crs, shape=(1, 1), tight=True)
# further adjust that via fitting
# Y = tr(X) => Y ~= s*X + t
# where X is in `dst_` and Y is in `gbox`
# .. so a better fit resolution is `_dst.res / s`
# .. but we want square pixels, so pick average between x and y
sx, sy = get_scale_at_point(xy_(0.5, 0.5), native_pix_transform(dst_, cp)).xy
# always produces square pixels on output with inverted Y axis
avg_res = (abs(dst_.resolution.x / sx) + abs(dst_.resolution.y / sy)) / 2
if round_resolution is not None:
if isinstance(round_resolution, bool):
if round_resolution:
avg_res = round(avg_res, 0)
else:
avg_res = round_resolution(avg_res, dst_crs.units[0])
res = res_(avg_res)
else:
if isinstance(resolution, str):
raise ValueError(
f"Resolution ought to be one of: same,auto,fit, not '{resolution}'"
)
res = res_(resolution)
return GeoBox.from_bbox(
bbox,
dst_crs,
shape=shape,
resolution=res,
tight=tight,
anchor=anchor,
tol=tol,
)