Source code for sa_gwdata.waterconnect

import json
import logging
import time

import pandas as pd
import requests


__all__ = ("WaterConnectSession", "Response")


logger = logging.getLogger(__name__)


[docs]class Response(object): def __init__(self, response, **kwargs): """Groundwater Data HTTP response. Args: response (requests.Response object): the HTTP response """ self.response = response for key, value in kwargs.items(): setattr(self, key, value) @property def r(self): """Return the HTTP requests.Response object.""" return self.response @property def json(self): """Convert the response to JSON. Returns a dict/list.""" if not hasattr(self, "_json"): self._json = json.loads(self.response.text) return self._json @property def df(self): """If the response is a list, convert to a pandas DataFrame with columns converted into the lowercase.""" if not hasattr(self, "_df"): df = pd.DataFrame(self.json).rename(columns=str.lower) self._df = df return self._df @property def df_exists(self): """Check if JSON can be converted to a DataFrame. Returns bool.""" if isinstance(self.json, list): return True return False
[docs]class WaterConnectSession(requests.Session): """Wrapper around repeated requests to Groundwater Data. Args: endpoint (str): url endpoint for API, optional sleep (int): minimum interval between requests in seconds. Be nice, do not reduce it. verify (bool): require valid SSL certificate Other args and kwargs are passed to request.Session constructor. Usage: >>> from sa_gwdata import WaterConnectSession >>> with WaterConnectSession() as s: ... df = s.get("GetObswellNetworkData", params={"Network": "CENT_ADEL"}) """ well_id_cols = { "dhno": "drillhole_no", "obsnumber": "obs_no", "mapnum": "unit_long", } def __init__(self, *args, endpoint=None, sleep=2, verify=True, **kwargs): super().__init__(*args, **kwargs) self.well_cache = pd.DataFrame(columns=set(self.well_id_cols.values())) self.verify = verify if not endpoint: endpoint = ( "https://www.waterconnect.sa.gov.au" "/_layouts/15" "/dfw.sharepoint.wdd/WDDDMS.ashx/" ) self.endpoint = endpoint self.last_request = time.time() - sleep self.sleep = sleep self.refresh_available_groupings()
[docs] def get(self, path, verify=None, **kwargs): """HTTP GET verb to Groundwater Data. Args: path (str): final portion of URL path off the end of self.endpoint e.g. to GET ``https://www.waterconnect.sa.gov.au/_layouts/15/dfw.sharepoint.wdd/WDDDMS.ashx/GetAdvancedListsData`` then you would use ``path="GetAdvancedListsData"``. """ if verify is None: verify = self.verify t_remain = self.sleep - (time.time() - self.last_request) if t_remain > 0: logger.debug("Waiting {} sec".format(t_remain)) time.sleep(t_remain) if not path.startswith(self.endpoint): path = self.endpoint + path logger.debug("GET {} verify={}".format(path, verify)) response = super().get(path, verify=verify, **kwargs) self.last_request = time.time() endpoint, name = path.rsplit("/", 1) return self._cache_data(Response(response, endpoint=endpoint, name=name))
def _cache_data(self, response): if response.df_exists: rdf = response.df cols_present = set(self.well_id_cols.keys()).intersection(set(rdf.columns)) rdf2 = rdf[cols_present].rename(columns=self.well_id_cols) self.well_cache = ( pd.concat([self.well_cache, rdf2], sort=False) .drop_duplicates() .sort_values("unit_long") ) return response
[docs] def refresh_available_groupings(self): """Load lists data from API. Stores them in the attributes networks, nrm_regions, pwas, pwras. """ response = self.get("GetAdvancedListsData") self.networks = {item["V"]: item["T"] for item in response.json["Networks"]} self.nrm_regions = { item["V"]: item["T"] + " NRM Region" for item in response.json["NRMRegion"] } self.pwas = { item["V"]: item["V"] + " PWA" for item in response.json["PrescribedArea"] } self.pwras = { item["V"]: item["V"] + " PWRA" for item in response.json["PrescribedWRArea"] }
[docs] def data_pwa(self, pwa, swl_status=None, tds_status=None): q = [] if not swl_status is None: q.append("SWLSTATUS='{}'".format("C" if swl_status else "H")) if not tds_status is None: q.append("SALSTATUS='{}'".format("H" if tds_status else "H")) return self.get( "GetPWASearchData?PWA={pwa}&Q={q}".format(pwa=pwa, q="%20AND%20".join(q)) )
# SALINITY - can join by AND # GetPWASearchData?PWA=Angas-Bremer&Q=SALSTATUS='C' # GetPWASearchData?PWA=Angas-Bremer&Q=SWLSTATUS='C'%20AND%20SALSTATUS='C'