Source code for capella_console_client.client

import logging
import sys

from datetime import datetime
from typing import List, Dict, Any, Union, Optional, no_type_check, Tuple
from collections import defaultdict
from pathlib import Path
import tempfile

import dateutil.parser

from capella_console_client.config import CONSOLE_API_URL
from capella_console_client.session import CapellaConsoleSession
from capella_console_client.logconf import logger
from capella_console_client.exceptions import (
    InsufficientFundsError,
    OrderRejectedError,
    NoValidStacIdsError,
    TaskNotCompleteError,
)
from capella_console_client.enumerations import ProductType, AssetType

from capella_console_client.assets import (
    _perform_download,
    DownloadRequest,
    _gather_download_requests,
    _get_asset_bytesize,
    _derive_stac_id,
    _filter_items_by_product_types,
)
from capella_console_client.search import StacSearch, SearchResult
from capella_console_client.tasking_request import get_tasking_requests, _task_contains_status, create_tasking_request
from capella_console_client.repeat_request import create_repeat_request
from capella_console_client.validate import (
    _validate_uuid,
    _validate_stac_id_or_stac_items,
    _validate_and_filter_product_types,
    _validate_and_filter_asset_types,
    _validate_and_filter_stac_ids,
)
from capella_console_client.sort import _sort_stac_items


[docs]class CapellaConsoleClient: """ API client for https://api.capellaspace.com. API docs: https://docs.capellaspace.com/accessing-data/searching-for-data Args: email: email on api.capellaspace.com password: password on api.capellaspace.com token: valid JWT access token verbose: flag to enable verbose logging no_token_check: does not check if provided JWT token is valid base_url: Capella console API base URL override search_url: Capella catalog/search/ override no_auth: bypass authentication NOTE: not providing either email and password or a jwt token for authentication will prompt you for email and password, which is not what you want in a script NOTE: Precedence order (high to low) 1. email and password 2. JWT token """ def __init__( self, email: Optional[str] = None, password: Optional[str] = None, token: Optional[str] = None, verbose: bool = False, no_token_check: bool = False, base_url: Optional[str] = CONSOLE_API_URL, search_url: Optional[str] = None, no_auth: bool = False, ): self._set_verbosity(verbose) self._sesh = CapellaConsoleSession(base_url=base_url, search_url=search_url, verbose=verbose) if not no_auth: self._sesh.authenticate(email, password, token, no_token_check) def _set_verbosity(self, verbose: bool = False): self.verbose = verbose logger.setLevel(logging.WARNING) if verbose: logger.setLevel(logging.INFO) # USER
[docs] def whoami(self) -> Dict[str, Any]: """ display user info Returns: Dict[str, Any]: return of GET /user """ resp = self._sesh.get("/user") return resp.json()
# TASKING
[docs] def create_tasking_request(self, **kwargs): """ Create a new tasking request Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests Args: geometry: A GeoJSON representation of the area/point of interest. Must be either a polygon or point name: Can be used along with description to help characterize and describe the tasking request. Default: "" description: Can be used along with name to help characterize and describe the tasking request. Default: "" window_open: Earliest time (in UTC) that you would like data to be collected. Default: Now window_close: Latest time (in UTC) that you would like data to be collected. Default: Seven days after window_open collection_tier: Preference for data to be collected within a certain time after window_open. Can be one of "urgent", "priority", "standard", and "flexible". Default: "standard" product_category: Category used to define image collection. "Extended" has broader look angles and "Custom" allows specifying advanced image acquisition parameters. More information on the specifics of each can be found at https://support.capellaspace.com/hc/en-us/articles/360049110852-SAR-Imagery-Product-Tasking-Categories. One of "standard", "extended", and "custom". Default: "standard" product_types: List of analytics to add to the order along with the imagery. Currently available analytics are Amplitude Change Detection and Vessel Detection. One of "ACD", "VS". Default: None archive_holdback: If defined will specify a time period during which the resulting imagery will be kept from the publicly accessible archive. One of "none", "one_year", "thirty_day", "permanent". Default: "none" custom_attribute_1: Can be used along with custom_attribute_2 to help you track a Capella task with your own metadata or internal systems. Default: None custom_attribute_2: Can be used along with custom_attribute_1 to help you track a Capella task with your own metadata or internal systems. Default: None collect_mode: Collect mode to be used by the satellite when making the collect. One of "spotlight", "stripmap", "sliding_spotlight". Default: "spotlight" look_direction: Constraint on view angle. One of "right", "left", "either". Default: "either" asc_dsc: Constraint on ascending/descending pass. One of "ascending", "descending", "either". Default: "either" orbital_planes: List of orbital planes allowed to service request. If empty any spacecraft in any plane can service request. One of 45, 53, 97. Default: None local_time: Times, in the timezone of the area where the image will be collected, during which the collect can be taken. Represented by a list of time ranges as seconds in the day. For example, [[21600, 64800]] would allow collects between 6 AM and 6 PM; [[0, 21600], [64800, 86400]] would allow collects between 6 PM and 12 AM as well as from 12 AM to 6 AM. Alternatively, you can pass string values of "day", "night", or "anytime" which are parsed to [[21600, 64800]], [[0, 21600], [64800, 86400]], and [[0, 86400]] respectively. Default: None off_nadir_min: Minimum off-nadir angle permitted. Must be less than off_nadir_max. Default: None off_nadir_max: Maximum off-nadir angle permitted. Must be greater than off_nadir_min. Default: None elevation_min: Minimum elevation angle permitted. Default: None elevation_max: Maximum elevation angle permitted. Default: None image_length: Image length. Default: None image_width: Image width. Default: None azimuth: Azimuth angle at collect mid-time. Clockwise angle from North to the spacecraft in the target frame of reference. Default: None grr_min: Minimum ground-range resolution. Minimum is in ordinal sense. Default: None grr_max: Maximum ground-range resolution. Maximum is in ordinal sense. Default: None srr_min: Minimum slant-range resolution. Minimum is in ordinal sense. Default: None srr_max: Maximum slant-range resolution. Maximum is in ordinal sense. Default: None azr_min: Minimum azimuth resolution. Minimum is in ordinal sense. Default: None azr_max: Maximum azimuth resolution. Maximum is in ordinal sense. Default: None nesz_max: Maximum allowable NESZ of resulting collect. Default: None num_looks: Number of looks to use in processing collect. Default: None polarization: Image polarization. One of "HH", "VV". Default: None Returns: Dict[str, Any]: created tasking request metadata """ return create_tasking_request(session=self._sesh, **kwargs)
[docs] def list_tasking_requests( self, *tasking_request_ids: Optional[str], for_org: Optional[bool] = False, **kwargs: Optional[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ list/ search tasking requests Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests Args: tasking_request_ids: list only specific tasking_request_ids (variadic, specify multiple) for_org: list all tasking requests of your organization (instead of only yours) - **requires** organization index/ admin permission additionally the following search filters are supported: • status: TaskingRequestStatus, one of received, review, submitted, active, accepted, rejected, expired, completed, anomaly, canceled, error, failed • submission_time__gt: datetime, e.g. datetime.datetime(2022, 12, 9, 21) Returns: List[Dict[str, Any]]: metadata of tasking requests """ return get_tasking_requests(*tasking_request_ids, session=self._sesh, for_org=for_org, **kwargs)
[docs] def get_task(self, tasking_request_id: str) -> Dict[str, Any]: """ fetch task for the specified `tasking_request_id` Args: tasking_request_id: tasking request UUID Returns: Dict[str, Any]: task metadata """ task_response = self._sesh.get(f"/task/{tasking_request_id}") return task_response.json()
[docs] def is_task_completed(self, task: Dict[str, Any]) -> bool: """ check if a task has completed """ return _task_contains_status(task, "completed")
[docs] def get_collects_for_task(self, tasking_request_id: str) -> List[Dict[str, Any]]: """ get all the collects associated with this task (see :py:meth:`get_task()`) Args: task: task metadata - return of :py:meth:`get_task()` Returns: List[Dict[str, Any]]: collect metadata associated """ task = self.get_task(tasking_request_id) tasking_request_id = task["properties"]["taskingrequestId"] if not self.is_task_completed(task): raise TaskNotCompleteError(f"TaskingRequest<{tasking_request_id}> is not in completed state") collects_list_resp = self._sesh.get(f"/collects/list/{tasking_request_id}") return collects_list_resp.json()
# REPEAT REQUESTS
[docs] def create_repeat_request(self, **kwargs): """ Create a new repeat request Find more information at https://docs.capellaspace.com/constellation-tasking/tasking-requests Args: geometry: A GeoJSON representation of the area/point of interest. Must be either a polygon or point name: Can be used along with description to help characterize and describe the tasking request. Default: "" description: Can be used along with name to help characterize and describe the tasking request. Default: "" collection_tier: Preference for data to be collected within a certain time after window_open. Can be either "flexible" or "routine". Default: "routine" product_category: Category used to define image collection. "Extended" has broader look angles and "Custom" allows specifying advanced image acquisition parameters. More information on the specifics of each can be found at https://support.capellaspace.com/hc/en-us/articles/360049110852-SAR-Imagery-Product-Tasking-Categories. One of "standard", "extended", and "custom". Default: "standard" archive_holdback: If defined will specify a time period during which the resulting imagery will be kept from the publicly accessible archive. One of "none", "one_year", "thirty_day", "permanent". Default: "none" custom_attribute_1: Can be used along with custom_attribute_2 to help you track a Capella task with your own metadata or internal systems. Default: None custom_attribute_2: Can be used along with custom_attribute_1 to help you track a Capella task with your own metadata or internal systems. Default: None product_types: List of analytics to add to the order along with the imagery. Currently available analytics are Amplitude Change Detection and Vessel Detection. One of "ACD", "VS". Default: None repeat_start: Starting date (in UTC) when you would like data to begin being collected. Default: Now repeat_end: Starting date (in UTC) when you would like data to stop being collected. This and repetition_count are mutually exclusive; only one of them can be defined per request. Default: None repetition_interval: Number of days between the start of each derived request. Default: 7 repetition_count: Total number of acquisitions in the repeat series. This and repeat_end are mutually exclusive; only one of them can be defined per request. Default: None maintain_scene_framing: Whether to maintain consistent framing (look-direction, ascending/descending, orbital-plane) across all acquisitions. Default: False look_angle_tolerance: Tolerance to look-angle deviations across all acquisitions. Default: None collect_mode: Collect mode to be used by the satellite when making the collect. One of "spotlight", "stripmap", "sliding_spotlight". Default: "spotlight" look_direction: Constraint on view angle. One of "right", "left", "either". Default: "either" asc_dsc: Constraint on ascending/descending pass. One of "ascending", "descending", "either". Default: "either" orbital_planes: List of orbital planes allowed to service request. If empty any spacecraft in any plane can service request. One of 45, 53, 97. Default: None local_time: Times, in the timezone of the area where the image will be collected, during which the collect can be taken. Represented by a list of time ranges as seconds in the day. For example, [[21600, 64800]] would allow collects between 6 AM and 6 PM; [[0, 21600], [64800, 86400]] would allow collects between 6 PM and 12 AM as well as from 12 AM to 6 AM. Alternatively, you can pass string values of "day", "night", or "anytime" which are parsed to [[21600, 64800]], [[0, 21600], [64800, 86400]], and [[0, 86400]] respectively. Default: None off_nadir_min: Minimum off-nadir angle permitted. Must be less than off_nadir_max. Default: None off_nadir_max: Maximum off-nadir angle permitted. Must be greater than off_nadir_min. Default: None elevation_min: Minimum elevation angle permitted. Default: None elevation_max: Maximum elevation angle permitted. Default: None image_length: Image length. Default: None image_width: Image width. Default: None azimuth: Azimuth angle at collect mid-time. Clockwise angle from North to the spacecraft in the target frame of reference. Default: None grr_min: Minimum ground-range resolution. Minimum is in ordinal sense. Default: None grr_max: Maximum ground-range resolution. Maximum is in ordinal sense. Default: None srr_min: Minimum slant-range resolution. Minimum is in ordinal sense. Default: None srr_max: Maximum slant-range resolution. Maximum is in ordinal sense. Default: None azr_min: Minimum azimuth resolution. Minimum is in ordinal sense. Default: None azr_max: Maximum azimuth resolution. Maximum is in ordinal sense. Default: None nesz_max: Maximum allowable NESZ of resulting collect. Default: None num_looks: Number of looks to use in processing collect. Default: None polarization: Image polarization. One of "HH", "VV". Default: None Returns: Dict[str, Any]: created repeat request metadata """ return create_repeat_request(session=self._sesh, **kwargs)
# ORDER
[docs] def list_orders(self, *order_ids: Optional[str], is_active: Optional[bool] = False) -> List[Dict[str, Any]]: """ list orders Args: order_id: list only specific orders (variadic, specify multiple) is_active: list only active (non-expired) orders Returns: List[Dict[str, Any]]: metadata of orders """ orders = [] if order_ids: for order_id in order_ids: _validate_uuid(order_id) # prefilter non expired if is_active: orders = _get_non_expired_orders(session=self._sesh) if order_ids: set_order_ids = set(order_ids) orders = [o for o in orders if o["orderId"] in set_order_ids] else: # list all orders if not order_ids: params = { "customerId": self._sesh.customer_id, } resp = self._sesh.get("/orders", params=params) orders = resp.json() # list specific orders else: for order_id in order_ids: resp = self._sesh.get(f"/orders/{order_id}") orders.append(resp.json()) return orders
[docs] def get_stac_items_of_order(self, order_id: str, ids_only: bool = False) -> Union[List[str], SearchResult]: """ get stac items of an existing order Args: order_id: order id """ _validate_uuid(order_id) order_meta = self.list_orders(order_id)[0] stac_ids = [item["granuleId"] for item in order_meta["items"]] if ids_only: return stac_ids return self.search(ids=stac_ids)
def review_order( self, stac_ids: Optional[List[str]] = None, items: Optional[Union[List[Dict[str, Any]], SearchResult]] = None, ) -> Dict[str, Any]: stac_ids = _validate_stac_id_or_stac_items(stac_ids, items) logger.info(f"reviewing order for {', '.join(stac_ids)}") stac_items = items # type: ignore if not items: stac_items = self.search(ids=stac_ids) if not stac_items: raise NoValidStacIdsError(f"No valid STAC IDs in {', '.join(stac_ids)}") # review order order_payload = self._construct_order_payload(stac_items) review_order_response = self._sesh.post("/orders/review", json=order_payload).json() if not review_order_response.get("authorized", False): raise InsufficientFundsError(review_order_response["authorizationDenialReason"]["message"]) return review_order_response
[docs] def submit_order( self, stac_ids: Optional[List[str]] = None, items: Optional[Union[List[Dict[str, Any]], SearchResult]] = None, check_active_orders: bool = False, omit_search: bool = False, omit_review: bool = False, ) -> str: """ submit an order by `stac_ids` or `items`. NOTE: Precedence order (high to low) 1. stac_ids 2. items Args: stac_ids: STAC IDs that active order should include items: STAC items, returned by :py:meth:`search` check_active_orders: check if any active order containing ALL `stac_ids` is available if True: returns that order ID if False: submits a new order and returns new order ID omit_search: omit search to ensure provided STAC IDs are valid - only works if `items` are provided omit_review: omit review stage Returns: str: order UUID """ stac_ids = _validate_stac_id_or_stac_items(stac_ids, items) if check_active_orders: order_id = self._find_active_order(stac_ids) if order_id is not None: logger.info(f"found active order {order_id}") return order_id if stac_ids and not omit_search: stac_items = self.search(ids=stac_ids) else: if omit_search and not items: logger.warning("setting omit_search=True only works in combination providing items instead of stac_ids") stac_items = self.search(ids=stac_ids) else: stac_items = items # type: ignore if not stac_items: raise NoValidStacIdsError(f"No valid STAC IDs in {', '.join(stac_ids)}") if not omit_review: self.review_order(items=stac_items) logger.info(f"submitting order for {', '.join(stac_ids)}") order_payload = self._construct_order_payload(stac_items) res_order = self._sesh.post("/orders", json=order_payload) con = res_order.json() order_id = con["orderId"] if con["orderStatus"] == "rejected": raise OrderRejectedError(f"Order for {', '.join(stac_ids)} rejected.") logger.info(f"successfully submitted order {order_id}") return order_id # type: ignore
def _construct_order_payload(self, stac_items): by_collect_id = defaultdict(list) for item in stac_items: by_collect_id[item["collection"]].append(item["id"]) order_items = [] for collection, stac_ids_of_coll in by_collect_id.items(): order_items.extend([{"collectionId": collection, "granuleId": stac_id} for stac_id in stac_ids_of_coll]) return {"items": order_items} def _find_active_order(self, stac_ids: List[str]) -> Optional[str]: """ find active order containing ALL specified `stac_ids` Args: stac_ids: STAC IDs that active order should include """ order_id = None active_orders = _get_non_expired_orders(session=self._sesh) if not active_orders: return None for ord in active_orders: granules = set([i["granuleId"] for i in ord["items"]]) if granules.issuperset(stac_ids): order_id = ord["orderId"] break return order_id
[docs] def get_presigned_items( self, order_id: str, stac_ids: Optional[List[str]] = None, sort_by: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """ get presigned items hrefs for all products contained in order Args: order_id: active order ID (see :py:meth:`submit_order`) stac_ids: filter presigned assets by STAC IDs sort_by: list of stac ids to sort by Returns: List[Dict[str, Any]]: List of assets of respective product, e.g. .. highlight:: python .. code-block:: python [ { "<asset_type>": { "title": ..., "href": ..., "type": ... }, ... } ] """ _validate_uuid(order_id) logger.info(f"getting presigned items for order {order_id}") response = self._sesh.get(f"/orders/{order_id}/download") presigned_stac_items = response.json() # ensure sort sort_by = _validate_and_filter_stac_ids(sort_by) if sort_by: presigned_stac_items = _sort_stac_items(items=presigned_stac_items, stac_ids=sort_by) # no filter if not stac_ids: return presigned_stac_items stac_ids_set = set(stac_ids) return [item for item in presigned_stac_items if item["id"] in stac_ids_set]
[docs] def get_presigned_assets( self, order_id: str, stac_ids: Optional[List[str]] = None, sort_by: Optional[List[str]] = None, assets_only: Optional[bool] = True, ) -> List[Dict[str, Any]]: """ get presigned assets hrefs for all products contained in order Args: order_id: active order ID (see :py:meth:`submit_order`) stac_ids: filter presigned assets by STAC IDs sort_by: list of stac ids to sort by Returns: List[Dict[str, Any]]: List of assets of respective product, e.g. .. highlight:: python .. code-block:: python [ { "<asset_type>": { "title": ..., "href": ..., "type": ... }, ... } ] """ if not assets_only: logger.warning( "`assets_only` is kept for backwards compatibility but has no effect. Please use `get_presigned_items` instead" ) items_presigned = self.get_presigned_items(order_id, stac_ids, sort_by) return [item["assets"] for item in items_presigned]
[docs] def get_asset_bytesize(self, pre_signed_url: str) -> int: """get size in bytes of `pre_signed_url`""" return _get_asset_bytesize(pre_signed_url)
# DOWNLOAD
[docs] def download_asset( self, pre_signed_url: str, local_path: Union[Path, str] = None, override: bool = False, show_progress: bool = False, ) -> Path: """ downloads a presigned asset url to disk Args: pre_signed_url: presigned asset url, see :py:meth:`get_presigned_items` local_path: local output path - file is written to OS's temp dir if not provided override: override already existing `local_path` show_progress: show download status progressbar """ dl_request = DownloadRequest( url=pre_signed_url, local_path=local_path, # type: ignore asset_key="asset", ) return _perform_download( download_requests=[dl_request], override=override, threaded=False, show_progress=show_progress, )["asset"]
[docs] def download_products( self, items_presigned: Optional[List[Dict[str, Any]]] = None, order_id: Optional[str] = None, tasking_request_id: Optional[str] = None, collect_id: Optional[str] = None, local_dir: Union[Path, str] = Path(tempfile.gettempdir()), include: Union[List[Union[str, AssetType]], str] = None, exclude: Union[List[Union[str, AssetType]], str] = None, override: bool = False, threaded: bool = True, show_progress: bool = False, separate_dirs: bool = True, product_types: List[Union[str, ProductType]] = None, ) -> Dict[str, Dict[str, Path]]: """ download all assets of multiple products Args: items_presigned: stac items with presigned assets, see :py:meth:`get_presigned_items` order_id: optionally provide `order_id` instead of `assets_presigned`, see :py:meth:`submit_order` tasking_request_id: tasking request UUID of the task request you wish to download all associated products for collect_id: collect UUID you wish to download all associated products for NOTE: Precedence order (high to low) 1. items_presigned 2. order_id 3. tasking_request_id 4. collect_id Meaning e.g. assets_presigned takes precedence over order_id, ... local_dir: local directory where assets are saved to, tempdir if not provided include: white-listing, which assets should be included, e.g. ["HH"] => only download HH asset exclude: black-listing, which assets should be excluded, e.g. ["HH", "thumbnail"] => download ALL except HH and thumbnail assets NOTE: explicit DENY overrides explicit ALLOW asset choices: * 'HH', 'VV', 'raster', 'metadata', 'thumbnail' (external) - raster == 'HH' || 'VV' * 'log', 'profile', 'stats', 'stats_plots' (internal) override: override already existing threaded: download assets of product in multiple threads show_progress: show download status progressbar separate_dirs: set to True in order to save the respective product assets into products directories, i.e. /tmp/<stac_id_1>/<stac_id_1>.tif /tmp/<stac_id_2>/<stac_id_2>.tif ... set to False in order to the respective product assets directly into the provided `local_dir`, i.e. /tmp/<stac_id_1>.tif /tmp/<stac_id_2>.tif ... product_types: filter by product type, e.g. ["SLC", "GEO"] Returns: Dict[str, Dict[str, Path]]: Local paths of downloaded files keyed by STAC id and asset type, e.g. .. highlight:: python .. code-block:: python { "stac_id_1": { "<asset_type>": <path-to-asset>, ... } } """ local_dir = Path(local_dir) one_of_required = (items_presigned, order_id, tasking_request_id, collect_id) if not any(map(bool, one_of_required)): raise ValueError("please provide one of assets_presigned, order_id, tasking_request_id or collect_id") product_types = _validate_and_filter_product_types(product_types) include = _validate_and_filter_asset_types(include) exclude = _validate_and_filter_asset_types(exclude) if not items_presigned: items_presigned = self._resolve_items_presigned(order_id, tasking_request_id, collect_id, product_types) len_items_presigned = len(items_presigned) suffix = "s" if len_items_presigned > 1 else "" # filter product_type if product_types: items_presigned = _filter_items_by_product_types(items_presigned, product_types) logger.info(f"downloading {len_items_presigned} product{suffix}") # gather download requests download_requests = [] by_stac_id = {} for cur_item in items_presigned: cur_download_requests = _gather_download_requests( cur_item["assets"], local_dir, include, exclude, separate_dirs ) by_stac_id[cur_download_requests[0].stac_id] = { cur.asset_key: cur.local_path for cur in cur_download_requests } download_requests.extend(cur_download_requests) if not download_requests: logger.warning("Nothing to download") return by_stac_id # type: ignore # download _perform_download( download_requests=download_requests, override=override, threaded=threaded, show_progress=show_progress, ) return by_stac_id # type: ignore
def _resolve_items_presigned( self, order_id: Optional[str] = None, tasking_request_id: Optional[str] = None, collect_id: Optional[str] = None, product_types: List[str] = None, ) -> List[Dict[str, Any]]: stac_ids = None # 1 - resolve assets_presigned from order_id if order_id: _validate_uuid(order_id) else: # 2 - submit order for tasking_request_id if tasking_request_id: _validate_uuid(tasking_request_id) order_id, stac_ids = self._order_products_for_task(tasking_request_id, product_types) # type: ignore # 3 - submit order for collect_id else: _validate_uuid(collect_id) order_id, stac_ids = self._order_products_for_collect_ids( collect_ids=[collect_id], product_types=product_types # type: ignore ) return self.get_presigned_items(order_id, stac_ids) # type: ignore def _order_products_for_task( self, tasking_request_id: str, product_types: List[str] = None ) -> Tuple[str, List[str]]: """ order all products associated with a tasking request Args: tasking_request_id: tasking request UUID you wish to order all associated products for """ # gather up all collect IDs associated of this task collect_ids = [coll["collectId"] for coll in self.get_collects_for_task(tasking_request_id)] return self._order_products_for_collect_ids(collect_ids, product_types) def _order_products_for_collect_ids( self, collect_ids: List[str], product_types: List[str] = None ) -> Tuple[str, List[str]]: search_kwargs = dict( collect_id__in=collect_ids, ) if product_types: search_kwargs["product_type__in"] = product_types result = self.search(**search_kwargs) if not result: logger.warning("No STAC items found ... aborting") sys.exit(0) order_id = self.submit_order(items=result, omit_search=True, check_active_orders=True) return order_id, result.stac_ids
[docs] def download_product( self, assets_presigned: Optional[Dict[str, Any]] = None, order_id: Optional[str] = None, local_dir: Union[Path, str] = Path(tempfile.gettempdir()), include: Union[List[Union[str, AssetType]], str] = None, exclude: Union[List[Union[str, AssetType]], str] = None, override: bool = False, threaded: bool = True, show_progress: bool = False, ) -> Dict[str, Path]: """ download all assets of a product (TO BE DEPRECATED) Args: assets_presigned: mapping of presigned assets of multiple products, see :py:meth:`get_presigned_assets` order_id: optionally provide `order_id` instead of `assets_presigned`, see :py:meth:`submit_order` NOTE: Precedence order (high to low) 1. assets_presigned 2. order_id local_dir: local directory where assets are saved to, tempdir if not provided include: white-listing, which assets should be included, e.g. ["HH"] => only download HH asset exclude: black-listing, which assets should be excluded, e.g. ["HH", "thumbnail"] => download ALL except HH and thumbnail assets NOTE: explicit DENY overrides explicit ALLOW asset choices: * 'HH', 'VV', 'raster', 'metadata', 'thumbnail' (external) Note: raster == 'HH' || 'VV' * 'log', 'profile', 'stats', 'stats_plots' (internal accessible only) override: override already existing threaded: download assets of product in multiple threads show_progress: show download status progressbar Returns: Dict[str, Path]: Local paths of downloaded files keyed by asset type, e.g. .. highlight:: python .. code-block:: python { "<asset_type>": <path-to-asset>, ... } """ logger.warning("this method will be deprecated in future revisions. Please use `download_products` instead.") if not assets_presigned and not order_id: raise ValueError("please provide either assets_presigned or order_id") if not assets_presigned: _validate_uuid(order_id) assets_presigned = self._get_first_presigned_from_order(order_id) include = _validate_and_filter_asset_types(include) exclude = _validate_and_filter_asset_types(exclude) download_requests = _gather_download_requests(assets_presigned, local_dir, include, exclude) # type: ignore if not download_requests: logger.warning("Nothing to download") return {} return _perform_download( download_requests=download_requests, override=override, threaded=threaded, show_progress=show_progress, )
@no_type_check def _get_first_presigned_from_order(self, order_id: str) -> Dict[str, Any]: assets_presigned = self.get_presigned_assets(order_id) if len(assets_presigned) > 1: logger.warning( f"order {order_id} contains {len(assets_presigned)} products - using first one ({_derive_stac_id(assets_presigned)})" ) return assets_presigned[0] # SEARCH
[docs] def search(self, **kwargs) -> SearchResult: """ paginated search for up to 500 matches (if no bigger limit specified) Find more information at https://docs.capellaspace.com/accessing-data/searching-for-data supported search filters: • bbox: List[float, float, float, float], e.g. [12.35, 41.78, 12.61, 42] • billable_area: Billable Area in m^2 • center_frequency: Union[int, float], Center Frequency (GHz) • collections: List[str], e.g. ["capella-open-data"] • collect_id: str, capella internal collect-uuid, e.g. '78616ccc-0436-4dc2-adc8-b0a1e316b095' • constellation: str, e.g. "capella" • datetime: str, e.g. "2020-02-12T00:00:00Z" • epsg: int, e.g. 32648 • frequency_band: str, Frequency band, one of "P", "L", "S", "C", "X", "Ku", "K", "Ka" • ids: List[str], e.g. `["CAPELLA_C02_SP_GEO_HH_20201109060434_20201109060437"]` • intersects: geometry component of the GeoJSON, e.g. {'type': 'Point', 'coordinates': [-113.1, 51.1]} • incidence_angle: Union[int, float], Center incidence angle, between 0 and 90 • instruments: List[str], leveraged instruments, e.g. ["capella-radar-5"] • instrument_mode: str, Instrument mode, one of "spotlight", "stripmap", "sliding_spotlight" • limit: int, default: 500 • local_datetime: str, local datetime, e.g. 2022-12-12TT07:37:42.324551+0800 • local_time: str, local time, e.g. 07:37:42.324551 • local_timezone: str, local timezone, e.g. Asia/Shanghai • look_angle: Union[int, float], e.g. 28.4 • looks_azimuth: int, e.g. 5 • looks_equivalent_number: int, Equivalent number of looks (ENL), e.g. 3 • looks_range: int, e.g. 5 • observation_direction: str, Antenna pointing direction, one of "right", "left" • orbit_state: str, Orbit State, one of "ascending", "descending" • orbital_plane: int, Orbital Plane, inclination angle of orbit • pixel_spacing_azimuth: Union[int, float], Pixel spacing azimuth (m), e.g. 0.5 • pixel_spacing_range: Union[int, float], Pixel spacing range (m), e.g. 0.5 • platform: str, e.g. "capella-2" • polarizations: str, one of "HH", "VV", "HV", "VH" • product_category: str, one of "standard", "custom", "extended" • product_type: str, one of "SLC", "GEO" • resolution_azimuth: float, Resolution azimuth (m), e.g. 0.5 • resolution_ground_range: float, Resolution ground range (m), e.g. 0.5 • resolution_range: float, Resolution range (m), e.g. 0.5 • squint_angle: float, Squint angle, e.g. 30.1 supported operations: • eq: equality search • in: within group • gt: greater than • gte: greater than equal • lt: lower than • lte: lower than equal sorting: • sortby: List[str] - must be supported fields, e.g. ["+datetime"] Returns: List[Dict[str, Any]]: STAC items matched """ search = StacSearch(session=self._sesh, **kwargs) return search.fetch_all()
def _get_non_expired_orders(session: CapellaConsoleSession) -> List[Dict[str, Any]]: params = {"customerId": session.customer_id} res = session.get("/orders", params=params) all_orders = res.json() ordered_by_exp_date = sorted(all_orders, key=lambda x: x["expirationDate"]) now = datetime.utcnow() active_orders = [] while ordered_by_exp_date: cur = ordered_by_exp_date.pop() cur_exp_date = dateutil.parser.parse(cur["expirationDate"], ignoretz=True) if cur_exp_date < now: break active_orders.append(cur) return active_orders