Source code for capella_console_client.client

import logging
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Any, cast

from capella_console_client.assets import (
    DownloadRequest,
    _derive_stac_id,
    _filter_items_by_product_types,
    _gather_download_requests,
    _get_asset_bytesize,
    _perform_download,
)
from capella_console_client.config import CONSOLE_API_URL
from capella_console_client.enumerations import AssetType, ProductType
from capella_console_client.exceptions import (
    InsufficientFundsError,
    NoValidStacIdsError,
    OrderRejectedError,
    TaskNotCompleteError,
)
from capella_console_client.logconf import logger
from capella_console_client.order import get_non_expired_orders, get_order
from capella_console_client.repeat_request import cancel_repeat_requests, create_repeat_request, update_repeat_requests
from capella_console_client.report import print_cancelation_result
from capella_console_client.s3 import S3Path
from capella_console_client.search import (
    RepeatRequestSearch,
    RepeatRequestSearchResult,
    StacSearch,
    StacSearchResult,
    TaskingRequestSearch,
    TaskingRequestSearchResult,
)
from capella_console_client.session import CapellaConsoleSession
from capella_console_client.sort import _sort_stac_items
from capella_console_client.tasking_request import (
    _task_contains_status,
    cancel_tasking_requests,
    create_tasking_request,
    get_tasking_request,
    update_tasking_requests,
)
from capella_console_client.validate import (
    _compact_unique,
    _validate_and_filter_asset_types,
    _validate_and_filter_product_types,
    _validate_and_filter_stac_ids,
    _validate_stac_id_or_stac_items,
    _validate_uuid,
    _validate_uuids,
)


[docs]class CapellaConsoleClient: """ API client for https://api.capellaspace.com. API docs: https://docs.capellaspace.com/accessing-data/searching-for-data Args: api_key: api key for api.capellaspace.com token: JWT access token verbose: flag to enable verbose logging no_token_check: do not check if provided JWT token or API KEY is valid base_url: Capella console API base URL override search_url: Capella catalog/search/ override no_auth: bypass authentication NOTE: not providing either `api_key` (can be set by CAPELLA_API_KEY env) or `token` will prompt you for `api_key`, which is not what you want in a script NOTE: precedence order (high to low) 1. API key 2. JWT token """ def __init__( self, api_key: str | None = None, token: str | None = None, verbose: bool = False, no_token_check: bool = False, base_url: str | None = CONSOLE_API_URL, search_url: str | None = None, no_auth: bool = False, ): self._set_verbosity(verbose) self._sesh = CapellaConsoleSession(base_url=base_url, search_url=search_url, verbose=verbose) if not no_auth: self._sesh.authenticate(api_key, token, no_token_check) def _set_verbosity(self, verbose: bool = False): self.verbose = verbose logger.setLevel(logging.WARNING) if verbose: logger.setLevel(logging.INFO) # USER
[docs] def whoami(self) -> dict[str, Any]: """ display user info Returns: Dict[str, Any]: return of GET /user """ resp = self._sesh.get("/user") return resp.json()
# TASKING
[docs] def create_tasking_request(self, **kwargs) -> dict[str, Any]: """ Create a new tasking request Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests Args: archive_holdback: If defined will specify a time period during which the resulting imagery will be kept from the publicly accessible archive. One of "none", "one_year", "thirty_day", "permanent". Default: "none" asc_dsc: Constraint on ascending/descending pass. One of "ascending", "descending", "either". Default: "either" azimuth_angle_max: clockwise angle with respect to North in a topocentric geodetic ENZ coordinate system from the target to the satellite. Default: None azimuth_angle_min: clockwise angle with respect to North in a topocentric geodetic ENZ coordinate system from the target to the satellite. Default: None collection_tier: Preference for data to be collected within a certain time after window_open. Can be one of "urgent", "priority", "standard", and "flexible". Default: "standard" collection_type: The collection type sets the collect mode, number of looks, and resolutions for the resulting imagery. The available collection types can be found by submitting: GET https://api.capellaspace.com/collectiontypes contract_id: charge tasking request on explicit contract (if omitted default contract is used) custom_attribute_1: Can be used along with custom_attribute_2 to help you track a Capella task with your own metadata or internal systems. Default: None custom_attribute_2: Can be used along with custom_attribute_1 to help you track a Capella task with your own metadata or internal systems. Default: None description: Can be used along with name to help characterize and describe the tasking request. Default: "" geometry: A GeoJSON representation of the area/point of interest. Must be either a polygon or point image_width: Image width. Units: [m], Default: None local_time: Times, in the timezone of the area where the image will be collected, during which the collect can be taken. Represented by a list of time ranges as seconds in the day. For example, [[21600, 64800]] would allow collects between 6 AM and 6 PM; [[0, 21600], [64800, 86400]] would allow collects between 6 PM and 12 AM as well as from 12 AM to 6 AM. Alternatively, you can pass string values of "day", "night", or "anytime" which are parsed to [[21600, 64800]], [[0, 21600], [64800, 86400]], and [[0, 86400]] respectively. Default: None look_direction: Constraint on view angle. One of "right", "left", "either". Default: "either" max_squint_angle: max. allowed absolute squint angle when generating collects. Units: [degrees]. Default: None name: Can be used along with description to help characterize and describe the tasking request. Default: "" off_nadir_max: Maximum off-nadir angle permitted. Must be greater than off_nadir_min. Default: None off_nadir_min: Minimum off-nadir angle permitted. Must be less than off_nadir_max. Default: None orbital_planes: List of orbital planes allowed to service request. If empty any spacecraft in any plane can service request. One of 45, 53, 97. Default: None polarization: Image polarization. One of "HH", "VV". Default: None pre_approval: will skip the tasking request cost review step if set to true. Default: false product_types: List of analytics to add to the order along with the imagery. Currently available analytics are Vessel classification (VC), Default: None squint: Determines if generated collects will be squinted. One of: enabled, forward, backward. Default: enabled for point requests, disabled for area requests window_close: Latest time (in UTC) that you would like data to be collected. Default: Seven days after window_open window_open: Earliest time (in UTC) that you would like data to be collected. Default: Now Returns: Dict[str, Any]: created tasking request metadata """ return create_tasking_request(session=self._sesh, **kwargs)
[docs] def update_tasking_requests(self, *tasking_request_ids: str, **kwargs) -> dict[str, Any]: """ Update multiple tasking requests with the same field values (parallel) Args: tasking_request_ids: UUIDs of tasking requests to update name: updated name description: updated description custom_attribute_1: updated custom attribute 1 custom_attribute_2: updated custom attribute 2 product_types: updated list of product types Returns: Dict[str, Any]: results keyed by tasking request id — the updated TR dict on success, or ``{"success": False, ...}`` on failure """ filtered_ids = _compact_unique(tasking_request_ids) _validate_uuids(filtered_ids) return update_tasking_requests(*filtered_ids, session=self._sesh, **kwargs)
[docs] def search_tasking_requests(self, **kwargs: Any) -> TaskingRequestSearchResult: """ search tasking requests Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests supported query filters: • collection_type: CollectionType, e.g. "spotlight_ultra" • collection_tier: CollectionTier, e.g. "priority" • last_status_time: str, UTC datetime of latest status, e.g. "2020-02-12T00:00:00Z", "2020-02-12" • for_org: boolean: str, scope tasking request search to user's org (requires elevated permissions), e.g. True • instrument_mode: InstrumentMode, e.g. "spotlight" • org_id: str, organization id to list tasking requests for (requires elevated permissions) -- takes precedence over for_org -- , e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • status: current TaskingRequestStatus, one of received, review, submitted, active, accepted, rejected, expired, completed, anomaly, canceled, error, failed • submission_time: str, UTC datetime of task submission, e.g. "2020-02-12T00:00:00Z", "2020-02-12" • tasking_request_id: str, tasking request id, e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • user_id: str, user id to list tasking requests for (requires elevated permissions) -- takes precedence over for_org -- , e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • window_open: str, Earliest UTC datetime of collection, e.g. "2020-02-11" • window_close: str, Latest UTC datetime of collection, e.g. "2020-02-12" • page_size: int, page size, default: 250, needs to be between 250 and 500 • show_progress: bool, display interactive progress bar during pagination, default: False • threaded: bool, enable parallel pagination requests, default: True supported operators: • eq: equality search • gt: greater than • gte: greater than equal • lt: lower than • lte: lower than equal Returns: TaskingRequestSearchResult: matched tasking requests """ search = TaskingRequestSearch(session=self._sesh, **kwargs) return cast(TaskingRequestSearchResult, search.fetch_all())
[docs] def get_task(self, tasking_request_id: str) -> dict[str, Any]: """ fetch task for the specified `tasking_request_id` Args: tasking_request_id: tasking request UUID Returns: Dict[str, Any]: task metadata """ _validate_uuid(tasking_request_id) return get_tasking_request(tasking_request_id=tasking_request_id, session=self._sesh)
[docs] def is_task_completed(self, task: dict[str, Any]) -> bool: """ check if a task has completed """ return _task_contains_status(task, "completed")
[docs] def cancel_tasking_requests(self, *tasking_request_ids: str | None) -> dict[str, Any]: """ cancel tasking requests Find more information `at <https://docs.capellaspace.com/constellation-tasking/cancel-task>`_. For Cancellation fees please refer to `Capella's Tasking Cancellation Policy Overview <https://support.capellaspace.com/what-is-the-tasking-cancellation-policy>`_. Args: tasking_request_ids: list of tasking_request_ids to cancel Returns: Dict[str, Any]: cancel results in the following format:: { "<tr-id-1>": { "success": True, # → no "error" field }, "<tr-id-2>": { "success": False, # ← **only when success=False** "error": {} }, ... } """ filtered_tasking_request_ids = _compact_unique(tasking_request_ids) _validate_uuids(filtered_tasking_request_ids) if self.verbose: plural_s = "s" if len(filtered_tasking_request_ids) > 1 else "" logger.info(f"attempting to cancel {len(filtered_tasking_request_ids)} tasking request{plural_s}") results_by_tr_id = cancel_tasking_requests(*filtered_tasking_request_ids, session=self._sesh) if self.verbose: print_cancelation_result(results_by_tr_id, task_type="tasking") return results_by_tr_id
# COLLECTS
[docs] def get_collects_for_task(self, tasking_request_id: str) -> list[dict[str, Any]]: """ get all the collects associated with this task (see :py:meth:`get_task()`) Args: task: task metadata - return of :py:meth:`get_task()` Returns: List[Dict[str, Any]]: collect metadata associated """ task = self.get_task(tasking_request_id) tasking_request_id = task["properties"]["taskingrequestId"] if not self.is_task_completed(task): raise TaskNotCompleteError(f"TaskingRequest<{tasking_request_id}> is not in completed state") collects_list_resp = self._sesh.get(f"/collects/list/{tasking_request_id}") return collects_list_resp.json()
# REPEAT REQUESTS
[docs] def create_repeat_request(self, **kwargs) -> dict[str, Any]: """ Create a new repeat request Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests Args: archive_holdback: If defined will specify a time period during which the resulting imagery will be kept from the publicly accessible archive. One of "none", "one_year", "thirty_day", "permanent". Default: "none" asc_dsc: Constraint on ascending/descending pass. One of "ascending", "descending", "either". Default: "either" azimuth_angle_max: clockwise angle with respect to North in a topocentric geodetic ENZ coordinate system from the target to the satellite. Default: None azimuth_angle_min: clockwise angle with respect to North in a topocentric geodetic ENZ coordinate system from the target to the satellite. Default: None azimuth_angle_tolerance: Tolerance to azimuth-angle deviations across all acquisitions. Units: [degrees] collection_tier: Preference for data to be collected within a certain time after window_open. Can be either "flexible" or "routine". Default: "routine" collection_type: The collection type sets the collect mode, number of looks, and resolutions for the resulting imagery. The available collection types can be found by submitting: GET https://api.capellaspace.com/collectiontypes contract_id: charge repeat request on explicit contract (if omitted default contract is used) custom_attribute_1: Can be used along with custom_attribute_2 to help you track a Capella task with your own metadata or internal systems. Default: None custom_attribute_2: Can be used along with custom_attribute_1 to help you track a Capella task with your own metadata or internal systems. Default: None description: Can be used along with name to help characterize and describe the tasking request. Default: "" geometry: A GeoJSON representation of the area/point of interest. Must be either a polygon or point image_width: Image width. Units: [m], Default: None insar_orbit: Insar Orbit identifier - required for insar collection_tier, MIO_53_2P95 local_time: Times, in the timezone of the area where the image will be collected, during which the collect can be taken. Represented by a list of time ranges as seconds in the day. For example, [[21600, 64800]] would allow collects between 6 AM and 6 PM; [[0, 21600], [64800, 86400]] would allow collects between 6 PM and 12 AM as well as from 12 AM to 6 AM. Alternatively, you can pass string values of "day", "night", or "anytime" which are parsed to [[21600, 64800]], [[0, 21600], [64800, 86400]], and [[0, 86400]] respectively. Default: None look_angle_tolerance: Tolerance to look-angle deviations across all acquisitions. Units: [degrees] look_direction: Constraint on view angle. One of "right", "left", "either". Default: "either" maintain_scene_framing: Flag to maintain consistent framing (look-direction, ascending/descending, orbital-plane) across all acquisitions. max_squint_angle: max. allowed absolute squint angle when generating collects. Units: [degrees]. Default: None name: Can be used along with description to help characterize and describe the tasking request. Default: "" off_nadir_max: Maximum off-nadir angle permitted. Must be greater than off_nadir_min. Default: None off_nadir_min: Minimum off-nadir angle permitted. Must be less than off_nadir_max. Default: None orbital_planes: List of orbital planes allowed to service request. If empty any spacecraft in any plane can service request. One of 45, 53, 97. Default: None polarization: Image polarization. One of "HH", "VV". Default: None product_types: List of analytics to add to the order along with the imagery. Currently available analytics are Vessel classification (VC), Default: None repeat_end: Starting date (in UTC) when you would like data to stop being collected. This and repetition_count are mutually exclusive; only one of them can be defined per request. Default: None repeat_start: Starting date (in UTC) when you would like data to begin being collected. Default: Now repetition_count: Total number of acquisitions in the repeat series. This and repeat_end are mutually exclusive; only one of them can be defined per request. Default: None repetition_interval: Number of days between the start of each derived request. Default: 7 squint: Determines if generated collects will be squinted. One of: enabled, forward, backward. Default: enabled for point requests, disabled for area requests window_duration: Duration of derived repeat requests in seconds Returns: Dict[str, Any]: created repeat request metadata """ return create_repeat_request(session=self._sesh, **kwargs)
[docs] def search_repeat_requests(self, **kwargs: Any) -> RepeatRequestSearchResult: """ search repeat requests Find more information at https://docs.capellaspace.com/constellation-tasking/searching-tasking-and-repeat-requests supported query filters: • collection_type: CollectionType, e.g. "spotlight_ultra" • collection_tier: CollectionTier, e.g. "priority" • last_status_time: str, UTC datetime of latest status, e.g. "2020-02-12T00:00:00Z", "2020-02-12" • for_org: boolean: str, scope repeat request search to user's org (requires elevated permissions), e.g. True • instrument_mode: InstrumentMode, e.g. "spotlight" • org_id: str, organization id to list repeat requests for (requires elevated permissions) -- takes precedence over for_org -- , e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • repeat_request_id: str, repeat request id, e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • repeat_start: str, UTC datetime of beginning of window recurrences, e.g. "2020-02-11" • repeat_end: str, UTC datetime of end of window recurrences, e.g. "2020-02-12" • repetition_interval: int, number of days between the start of derived requests, e.g. 7 • status: current status of repeat request, one of received, review, submitted, active, accepted, rejected, expired, completed, anomaly, canceled, error, failed • submission_time: str, UTC datetime of task submission, e.g. "2020-02-12T00:00:00Z", "2020-02-12" • user_id: str, user id to list repeat requests for (requires elevated permissions) -- takes precedence over for_org -- , e.g. "34c78a57-2d68-4b4a-a7ba-c188f9e2645d" • page_size: int, page size, default: 250, needs to be between 250 and 500 • show_progress: bool, display interactive progress bar during pagination, default: False • threaded: bool, enable parallel pagination requests, default: True supported operators: • eq: equality search • gt: greater than • gte: greater than equal • lt: lower than • lte: lower than equal Returns: RepeatRequestSearchResult: matched repeat requests """ search = RepeatRequestSearch(session=self._sesh, **kwargs) return cast(RepeatRequestSearchResult, search.fetch_all())
[docs] def cancel_repeat_requests(self, *repeat_request_ids: str | None) -> dict[str, Any]: """ cancel repeat requests Find more information `here <https://docs.capellaspace.com/constellation-tasking/cancel-task>`__. For Cancellation fees please refer to `Capella's Tasking Cancellation Policy Overview <https://support.capellaspace.com/what-is-the-tasking-cancellation-policy>`_. Args: repeat_request_ids: list of repeat_request_ids to cancel Returns: Dict[str, Any]: cancel results in the following format:: { "<tr-id-1>": { "success": True, # → no "error" field }, "<tr-id-2>": { "success": False, # ← **only when success=False** "error": {} }, ... } """ filtered_repeat_request_ids = _compact_unique(repeat_request_ids) _validate_uuids(filtered_repeat_request_ids) if self.verbose: plural_s = "s" if len(filtered_repeat_request_ids) > 1 else "" logger.info(f"attempting to cancel {len(filtered_repeat_request_ids)} repeat request{plural_s}") results_by_tr_id = cancel_repeat_requests(*filtered_repeat_request_ids, session=self._sesh) if self.verbose: print_cancelation_result(results_by_tr_id, task_type="repeat") return results_by_tr_id
[docs] def update_repeat_requests(self, *repeat_request_ids: str, **kwargs) -> dict[str, Any]: """ Update multiple repeat requests with the same field values (parallel) Args: repeat_request_ids: UUIDs of the repeat requests to update name: updated name description: updated description custom_attribute_1: updated custom attribute 1 custom_attribute_2: updated custom attribute 2 product_types: updated list of product types Returns: Dict[str, Any]: update results keyed by repeat request ID """ filtered_ids = _compact_unique(repeat_request_ids) _validate_uuids(filtered_ids) return update_repeat_requests(*filtered_ids, session=self._sesh, **kwargs)
# ORDER
[docs] def list_orders(self, *order_ids: str | None, is_active: bool | None = False) -> list[dict[str, Any]]: """ list orders Args: order_id: list only specific orders (variadic, specify multiple) - if omitted all orders are listed is_active: list only active (non-expired) orders Returns: List[Dict[str, Any]]: metadata of orders """ if order_ids: filtered_order_ids = _compact_unique(order_ids) _validate_uuids(filtered_order_ids) else: filtered_order_ids = [] # prefilter non expired if is_active: orders = get_non_expired_orders(session=self._sesh) if filtered_order_ids: orders = [o for o in orders if o["orderId"] in set(filtered_order_ids)] return orders # list specific orders if filtered_order_ids: orders = [self._sesh.get(f"/orders/{order_id}").json() for order_id in filtered_order_ids] return orders # list all orders of customer params = { "customerId": self._sesh.customer_id, } resp = self._sesh.get("/orders", params=params) orders = resp.json() return orders
[docs] def get_stac_items_of_order(self, order_id: str, ids_only: bool = False) -> list[str] | StacSearchResult: """ get stac items of an existing order Args: order_id: order id """ _validate_uuid(order_id) order_meta = self.list_orders(order_id)[0] stac_ids = [item["granuleId"] for item in order_meta["items"]] if ids_only: return stac_ids return self.search(ids=stac_ids)
def review_order( self, stac_ids: list[str] | None = None, items: list[dict[str, Any]] | StacSearchResult | None = None, contract_id: str | None = None, ) -> dict[str, Any]: stac_ids = _validate_stac_id_or_stac_items(stac_ids, items) logger.info(f"reviewing order for {', '.join(stac_ids)}") stac_items: list[dict[str, Any]] | StacSearchResult if not items: stac_items = self.search(ids=stac_ids) else: stac_items = items if not stac_items: raise NoValidStacIdsError(f"No valid STAC IDs in {', '.join(stac_ids)}") # review order order_payload = self._construct_order_payload(stac_items, contract_id) review_order_response = self._sesh.post("/orders/review", json=order_payload).json() if not review_order_response.get("authorized", False): raise InsufficientFundsError(review_order_response["authorizationDenialReason"]["message"]) return review_order_response
[docs] def submit_order( self, stac_ids: list[str] | None = None, items: list[dict[str, Any]] | StacSearchResult | None = None, check_active_orders: bool = False, omit_search: bool = False, omit_review: bool = False, contract_id: str | None = None, ) -> str: """ submit an order by `stac_ids` or `items`. NOTE: Precedence order (high to low) 1. stac_ids 2. items Args: stac_ids: STAC IDs that active order should include items: STAC items, returned by :py:meth:`search` check_active_orders: check if any active order containing ALL `stac_ids` is available if True: returns that order ID if False: submits a new order and returns new order ID omit_search: omit search to ensure provided STAC IDs are valid - only works if `items` are provided omit_review: omit review stage contract_id: charge order on explicit contract (if omitted default contract is used) Returns: str: order UUID """ stac_ids = _validate_stac_id_or_stac_items(stac_ids, items) if check_active_orders: existing_order_id = get_order(session=self._sesh, stac_ids=stac_ids) if existing_order_id is not None: logger.info(f"found existing order {existing_order_id} containing all requested stac ids") return existing_order_id def _get_stac_items() -> list[dict[str, Any]] | StacSearchResult: stac_items: list[dict[str, Any]] | StacSearchResult if stac_ids and not omit_search: stac_items = self.search(ids=stac_ids) else: if omit_search and not items: logger.warning( "setting omit_search=True only works in combination providing items instead of stac_ids" ) stac_items = self.search(ids=stac_ids) else: if items is None: raise ValueError("items must be provided when omit_search is True") stac_items = items if not stac_items: raise NoValidStacIdsError(f"No valid STAC IDs in {', '.join(stac_ids)}") return stac_items stac_items = _get_stac_items() if not omit_review: self.review_order(items=stac_items, contract_id=contract_id) logger.info(f"submitting order for {', '.join(stac_ids)}") order_payload = self._construct_order_payload(stac_items, contract_id) res_order = self._sesh.post("/orders", json=order_payload) con = res_order.json() order_id = con["orderId"] if con["orderStatus"] == "rejected": raise OrderRejectedError(f"Order for {', '.join(stac_ids)} rejected.") logger.info(f"successfully submitted order {order_id}") return order_id
def _construct_order_payload(self, stac_items, contract_id: str | None = None): by_collect_id = defaultdict(list) for item in stac_items: by_collect_id[item["collection"]].append(item["id"]) order_items = [] for collection, stac_ids_of_coll in by_collect_id.items(): order_items.extend([{"collectionId": collection, "granuleId": stac_id} for stac_id in stac_ids_of_coll]) payload: dict[str, Any] = {"items": order_items} if contract_id: payload["contractId"] = contract_id return payload
[docs] def get_presigned_items( self, order_id: str, stac_ids: list[str] | None = None, sort_by: list[str] | None = None, ) -> list[dict[str, Any]]: """ get presigned items hrefs for all products contained in order Args: order_id: active order ID (see :py:meth:`submit_order`) stac_ids: filter presigned assets by STAC IDs sort_by: list of stac ids to sort by Returns: List[Dict[str, Any]]: List of assets of respective product, e.g. .. highlight:: python .. code-block:: python [ { "<asset_type>": { "title": ..., "href": ..., "type": ... }, ... } ] """ _validate_uuid(order_id) logger.info(f"getting presigned items for order {order_id}") response = self._sesh.get(f"/orders/{order_id}/download") presigned_stac_items = response.json() # ensure sort sort_by = _validate_and_filter_stac_ids(sort_by) if sort_by: presigned_stac_items = _sort_stac_items(items=presigned_stac_items, stac_ids=sort_by) # no filter if not stac_ids: return presigned_stac_items stac_ids_set = set(stac_ids) return [item for item in presigned_stac_items if item["id"] in stac_ids_set]
[docs] def get_presigned_assets( self, order_id: str, stac_ids: list[str] | None = None, sort_by: list[str] | None = None, assets_only: bool | None = True, ) -> list[dict[str, Any]]: """ get presigned assets hrefs for all products contained in order Args: order_id: active order ID (see :py:meth:`submit_order`) stac_ids: filter presigned assets by STAC IDs sort_by: list of stac ids to sort by Returns: List[Dict[str, Any]]: List of assets of respective product, e.g. .. highlight:: python .. code-block:: python [ { "<asset_type>": { "title": ..., "href": ..., "type": ... }, ... } ] """ if not assets_only: logger.warning( "`assets_only` is kept for backwards compatibility but has no effect. Please use `get_presigned_items` instead" ) items_presigned = self.get_presigned_items(order_id, stac_ids, sort_by) return [item["assets"] for item in items_presigned]
[docs] def get_asset_bytesize(self, pre_signed_url: str) -> int: """get size in bytes of `pre_signed_url`""" return _get_asset_bytesize(pre_signed_url)
# DOWNLOAD
[docs] def download_asset( self, pre_signed_url: str, local_path: Path | S3Path | str = Path(tempfile.gettempdir()), override: bool = False, show_progress: bool = False, enable_resume: bool = True, ) -> Path | S3Path: """ downloads a presigned asset url to disk Args: pre_signed_url: presigned asset url, see :py:meth:`get_presigned_items` local_path: output path - file is written to OS's temp dir if not provided, if directory provided filename will be set to original asset filename override: override already existing `local_path` show_progress: show download status progressbar enable_resume: enable resuming partial downloads (default: True). If enabled, partially downloaded files will be resumed from the last byte using HTTP Range headers. If the server doesn't support Range, the file will be re-downloaded from the start. """ # Convert str to Path/S3Path if needed resolved_local_path: Path | S3Path if isinstance(local_path, str): if local_path.startswith("s3://"): resolved_local_path = S3Path(local_path) else: resolved_local_path = Path(local_path) else: resolved_local_path = local_path # _download_asset will handle directory paths by creating a file in them dl_request = DownloadRequest( url=pre_signed_url, local_path=resolved_local_path, asset_key="asset", ) return _perform_download( download_requests=[dl_request], override=override, threaded=False, show_progress=show_progress, enable_resume=enable_resume, )["asset"]
[docs] def download_products( self, items_presigned: list[dict[str, Any]] | None = None, order_id: str | None = None, tasking_request_id: str | None = None, collect_id: str | None = None, local_dir: Path | S3Path | str = Path(tempfile.gettempdir()), include: list[str | AssetType] | str | None = None, exclude: list[str | AssetType] | str | None = None, override: bool = False, threaded: bool = True, show_progress: bool = False, separate_dirs: bool = True, product_types: list[str | ProductType] | None = None, contract_id: str | None = None, enable_resume: bool = True, ) -> dict[str, dict[str, Path | S3Path]]: """ download all assets of multiple products Args: items_presigned: stac items with presigned assets, see :py:meth:`get_presigned_items` order_id: optionally provide `order_id` instead of `assets_presigned`, see :py:meth:`submit_order` tasking_request_id: tasking request UUID of the task request you wish to download all associated products for collect_id: collect UUID you wish to download all associated products for NOTE: Precedence order (high to low) 1. items_presigned 2. order_id 3. tasking_request_id 4. collect_id Meaning e.g. assets_presigned takes precedence over order_id, ... local_dir: Path where assets are saved to, tempdir if not provided include: white-listing, which assets should be included, e.g. ["HH"] => only download HH asset exclude: black-listing, which assets should be excluded, e.g. ["HH", "thumbnail"] => download ALL except HH and thumbnail assets NOTE: explicit DENY overrides explicit ALLOW asset choices: * 'HH', 'VV', 'raster', 'metadata', 'thumbnail' (external) - raster == 'HH' || 'VV' * 'log', 'profile', 'stats', 'stats_plots' (internal) override: override already existing threaded: download assets of product in multiple threads show_progress: show download status progressbar separate_dirs: set to True in order to save the respective product assets into products directories, i.e. /tmp/<stac_id_1>/<stac_id_1>.tif /tmp/<stac_id_2>/<stac_id_2>.tif ... set to False in order to the respective product assets directly into the provided `local_dir`, i.e. /tmp/<stac_id_1>.tif /tmp/<stac_id_2>.tif ... product_types: filter by product type, e.g. ["SLC", "GEO"] contract_id: charge order on explicit contract (if omitted default contract is used) enable_resume: enable resuming partial downloads (default: True). If enabled, partially downloaded files will be resumed from the last byte using HTTP Range headers. If the server doesn't support Range, files will be re-downloaded from the start. Returns: Dict[str, Dict[str, Path]]: Local paths of downloaded files keyed by STAC id and asset type, e.g. .. highlight:: python .. code-block:: python { "stac_id_1": { "<asset_type>": <path-to-asset>, ... } } """ one_of_required = (items_presigned, order_id, tasking_request_id, collect_id) if not any(map(bool, one_of_required)): raise ValueError("please provide one of assets_presigned, order_id, tasking_request_id or collect_id") product_types = _validate_and_filter_product_types(product_types) include_filtered: list[str] | None = _validate_and_filter_asset_types(include) exclude_filtered: list[str] | None = _validate_and_filter_asset_types(exclude) if not items_presigned: items_presigned = self._resolve_items_presigned( order_id, tasking_request_id, collect_id, product_types, contract_id ) len_items_presigned = len(items_presigned) suffix = "s" if len_items_presigned > 1 else "" # filter product_type if product_types: items_presigned = _filter_items_by_product_types(items_presigned, product_types) logger.info(f"downloading {len_items_presigned} product{suffix}") # gather download requests download_requests = [] by_stac_id: dict[str, dict[str, Path | S3Path]] = {} for cur_item in items_presigned: cur_download_requests = _gather_download_requests( cur_item["assets"], local_dir, include_filtered, exclude_filtered, separate_dirs ) by_stac_id[cur_download_requests[0].stac_id] = { cur.asset_key: cur.local_path for cur in cur_download_requests } download_requests.extend(cur_download_requests) if not download_requests: logger.warning("Nothing to download") return by_stac_id # download _perform_download( download_requests=download_requests, override=override, threaded=threaded, show_progress=show_progress, enable_resume=enable_resume, ) return by_stac_id
def _resolve_items_presigned( self, order_id: str | None = None, tasking_request_id: str | None = None, collect_id: str | None = None, product_types: list[str] = None, contract_id: str | None = None, ) -> list[dict[str, Any]]: stac_ids: list[str] | None = None # 1 - resolve assets_presigned from order_id if order_id: _validate_uuid(order_id) else: # 2 - submit order for tasking_request_id if tasking_request_id: _validate_uuid(tasking_request_id) order_id, stac_ids = self._order_products_for_task(tasking_request_id, product_types, contract_id) # 3 - submit order for collect_id else: if collect_id is None: raise ValueError("One of order_id, tasking_request_id, or collect_id must be provided") _validate_uuid(collect_id) order_id, stac_ids = self._order_products_for_collect_ids( collect_ids=[collect_id], product_types=product_types, contract_id=contract_id ) return self.get_presigned_items(order_id, stac_ids) def _order_products_for_task( self, tasking_request_id: str, product_types: list[str] = None, contract_id: str | None = None ) -> tuple[str, list[str]]: """ order all products associated with a tasking request Args: tasking_request_id: tasking request UUID you wish to order all associated products for """ # gather up all collect IDs associated of this task collect_ids = [coll["collectId"] for coll in self.get_collects_for_task(tasking_request_id)] return self._order_products_for_collect_ids(collect_ids, product_types) def _order_products_for_collect_ids( self, collect_ids: list[str], product_types: list[str] = None, contract_id: str | None = None ) -> tuple[str, list[str]]: search_kwargs = dict( collect_id__in=collect_ids, ) if product_types: search_kwargs["product_type__in"] = product_types result = self.search(**search_kwargs) if not result: logger.warning("No STAC items found ... aborting") sys.exit(0) order_id = self.submit_order(items=result, omit_search=True, check_active_orders=True, contract_id=contract_id) return order_id, result.stac_ids
[docs] def download_product( self, assets_presigned: dict[str, Any] | None = None, order_id: str | None = None, local_dir: Path | S3Path | str = Path(tempfile.gettempdir()), include: list[str | AssetType] | str | None = None, exclude: list[str | AssetType] | str | None = None, override: bool = False, threaded: bool = True, show_progress: bool = False, ) -> dict[str, Path | S3Path]: """ download all assets of a product (TO BE DEPRECATED) Args: assets_presigned: mapping of presigned assets of multiple products, see :py:meth:`get_presigned_assets` order_id: optionally provide `order_id` instead of `assets_presigned`, see :py:meth:`submit_order` NOTE: Precedence order (high to low) 1. assets_presigned 2. order_id local_dir: Path where assets are saved to, tempdir if not provided include: white-listing, which assets should be included, e.g. ["HH"] => only download HH asset exclude: black-listing, which assets should be excluded, e.g. ["HH", "thumbnail"] => download ALL except HH and thumbnail assets NOTE: explicit DENY overrides explicit ALLOW asset choices: * 'HH', 'VV', 'raster', 'metadata', 'thumbnail' (external) Note: raster == 'HH' || 'VV' * 'log', 'profile', 'stats', 'stats_plots' (internal accessible only) override: override already existing threaded: download assets of product in multiple threads show_progress: show download status progressbar Returns: Dict[str, Path]: Local paths of downloaded files keyed by asset type, e.g. .. highlight:: python .. code-block:: python { "<asset_type>": <path-to-asset>, ... } """ logger.warning("this method will be deprecated in future revisions. Please use `download_products` instead.") if not assets_presigned and not order_id: raise ValueError("please provide either assets_presigned or order_id") if not assets_presigned: if order_id is None: raise ValueError("Either assets_presigned or order_id must be provided") _validate_uuid(order_id) assets_presigned = self._get_first_presigned_from_order(order_id) include_filtered: list[str] | None = _validate_and_filter_asset_types(include) exclude_filtered: list[str] | None = _validate_and_filter_asset_types(exclude) download_requests = _gather_download_requests(assets_presigned, local_dir, include_filtered, exclude_filtered) if not download_requests: logger.warning("Nothing to download") return {} return _perform_download( download_requests=download_requests, override=override, threaded=threaded, show_progress=show_progress, )
def _get_first_presigned_from_order(self, order_id: str) -> dict[str, Any]: assets_presigned = self.get_presigned_assets(order_id) if len(assets_presigned) > 1: stac_id = _derive_stac_id(assets_presigned[0]) logger.warning(f"order {order_id} contains {len(assets_presigned)} products - using first one ({stac_id})") return assets_presigned[0] # CATALOG EARCH
[docs] def search(self, **kwargs) -> StacSearchResult: """ search Capella's [S]patio [T]emporal [A]ssets [C]atalog Find more information `here <https://docs.capellaspace.com/accessing-data/searching-for-data>`__. supported query filters: • azimuth_angle: float, e.g. 123.4 • bbox: List[float, float, float, float], e.g. [12.35, 41.78, 12.61, 42] • billable_area: Billable Area in m^2 • center_frequency: Union[int, float], Center Frequency (GHz) • collections: List[str], e.g. ["capella-open-data"] • collect_id: str, capella internal collect-uuid, e.g. "78616ccc-0436-4dc2-adc8-b0a1e316b095" • collection_type: str, capella collection type, e.g. "spotlight_ultra" • constellation: str, e.g. "capella" • datetime: str, e.g. "2020-02-12T00:00:00Z" • epsg: int, e.g. 32648 • frequency_band: str, Frequency band, one of "P", "L", "S", "C", "X", "Ku", "K", "Ka" • ids: List[str], e.g. `["CAPELLA_C02_SP_GEO_HH_20201109060434_20201109060437"]` • image_formation_algorithm: str, Image Formation Algorithm, one of "pfa", "backprojection" • intersects: geometry component of the GeoJSON, e.g. {'type': 'Point', 'coordinates': [-113.1, 51.1]} • incidence_angle: Union[int, float], Center incidence angle, between 0 and 90 • instruments: List[str], leveraged instruments, e.g. ["capella-radar-5"] • instrument_mode: str, Instrument mode, one of "spotlight", "stripmap", "sliding_spotlight" • limit: int, default: 500 • layover_angle: str, e.g. -0.1 • local_datetime: str, local datetime, e.g. 2022-12-12TT07:37:42.324551+0800 • local_time: str, local time, e.g. 07:37:42.324551 • local_timezone: str, local timezone, e.g. Asia/Shanghai • look_angle: Union[int, float], e.g. 28.4 • looks_azimuth: int, e.g. 5 • looks_equivalent_number: int, Equivalent number of looks (ENL), e.g. 3 • looks_range: int, e.g. 5 • observation_direction: str, Antenna pointing direction, one of "right", "left" • orbit_state: str, Orbit State, one of "ascending", "descending" • orbital_plane: int, Orbital Plane, inclination angle of orbit • pixel_spacing_azimuth: Union[int, float], Pixel spacing azimuth (m), e.g. 0.5 • pixel_spacing_range: Union[int, float], Pixel spacing range (m), e.g. 0.5 • platform: str, e.g. "capella-2" • polarizations: str, one of "HH", "VV", "HV", "VH" • product_type: str, one of "SLC", "GEO" • resolution_azimuth: float, Resolution azimuth (m), e.g. 0.5 • resolution_ground_range: float, Resolution ground range (m), e.g. 0.5 • resolution_range: float, Resolution range (m), e.g. 0.5 • squint_angle: float, Squint angle, e.g. 30.1 • ownership: str, one of "ownedByOrganization", "sharedWithOrganization", "availableForPurchase", "publiclyAvailable" supported operators: • eq: equality search • in: within group • gt: greater than • gte: greater than equal • lt: lower than • lte: lower than equal sorting: • sortby: List[str] - must be supported fields, e.g. ["+datetime"] Returns: StacSearchResult: STAC items matched """ search = StacSearch(session=self._sesh, **kwargs) return search.fetch_all()
def catalog_search(self, **kwargs) -> StacSearchResult: return self.search(**kwargs)