Source code for sa_gwdata.waterconnect

import io
import json
import logging
import time

import pandas as pd
import requests

from sa_gwdata.identifiers import parse_well_ids, UnitNo, ObsNo, Well, Wells

__all__ = (
    "WaterConnectSession",
    "Response",
    "find_wells",
    "water_levels",
    "salinities",
    "drillers_logs",
)


logger = logging.getLogger(__name__)


[docs]class Response(object): def __init__(self, response, **kwargs): """Groundwater Data HTTP response. Args: response (requests.Response object): the HTTP response """ self.response = response for key, value in kwargs.items(): setattr(self, key, value) @property def r(self): """Return the HTTP requests.Response object.""" return self.response @property def json(self): """Convert the response to JSON. Returns a dict/list.""" if not hasattr(self, "_json"): self._json = json.loads(self.response.text) return self._json @property def df(self): """If the response is a list, convert to a pandas DataFrame with columns converted into lowercase.""" if not hasattr(self, "_df"): df = pd.DataFrame(self.json).rename(columns=str.lower) self._df = df return self._df @property def df_exists(self): """Check if JSON can be converted to a DataFrame. Returns bool.""" if isinstance(self.json, list): return True return False def gdf(self, x_col="lon", y_col="lat"): from shapely.geometry import Point import geopandas as gpd df = self.df return gpd.GeoDataFrame( df, geometry=df.apply(lambda x: Point(x[x_col], x[y_col]), axis=1) )
def get_global_session(force_new=False): if not "__waterconnect_session" in globals(): global __waterconnect_session __waterconnect_session = WaterConnectSession() else: if force_new: del globals()["__waterconnect_session"] __waterconnect_session = WaterConnectSession() return __waterconnect_session def find_wells(input_text, **kwargs): """Find wells and retrieve some summary information. Args: input_text (str): any well identifiers to parse. See :func:`sa_gwdata.parse_well_ids_plaintext` for details of other keyword arguments you can pass here. For example: >>> import sa_gwdata >>> wells = sa_gwdata.find_wells("yat99 5840-46 ULE205") ... >>> wells ['MLC008', 'ULE205', 'YAT099'] """ session = get_global_session() return session.find_wells(input_text, **kwargs) def water_levels(wells, session=None, **kwargs): """Get table of water level measurements for wells. Args: wells (list): list of drillhole numbers (ints) or :class:`sa_gwdata.Well` objects Returns: pandas DataFrame. """ session = get_global_session() return session.bulk_water_levels(wells, **kwargs) def salinities(wells, session=None, **kwargs): """Get table of salinity samples for wells. Args: wells (list): list of drillhole numbers (ints) or :class:`sa_gwdata.Well` objects Returns: pandas DataFrame. """ if session is None: session = get_global_session() return session.bulk_salinities(wells, **kwargs) def drillers_logs(wells, session=None, **kwargs): """Get table of lithological intervals from drillers logs for wells. Args: wells (list): list of drillhole numbers (ints) or :class:`sa_gwdata.Well` objects Returns: pandas DataFrame. """ if session is None: session = get_global_session() return session.bulk_drillers_logs(wells, **kwargs)
[docs]class WaterConnectSession(requests.Session): """Wrapper around repeated requests to Groundwater Data. Args: endpoint (str): url endpoint for API, optional sleep (int): minimum interval between requests in seconds. Be nice, do not reduce it. verify (bool): require valid SSL certificate Other args and kwargs are passed to request.Session constructor. Usage: >>> from sa_gwdata import WaterConnectSession >>> with WaterConnectSession() as s: ... df = s.get("GetObswellNetworkData", params={"Network": "CENT_ADEL"}) """ well_id_cols = { "dhno": "drillhole_no", "obsnumber": "obs_no", "mapnum": "unit_long", } def __init__( self, *args, endpoint=None, sleep=2, verify=True, load_list_data=True, **kwargs ): super().__init__(*args, **kwargs) self.well_cache = pd.DataFrame(columns=set(self.well_id_cols.values())) self.verify = verify if not endpoint: endpoint = "https://www.waterconnect.sa.gov.au/_layouts/15/dfw.sharepoint.wdd/{app}.ashx/" self.endpoint = endpoint self.last_request = time.time() - sleep self.sleep = sleep if load_list_data: self.refresh_available_groupings()
[docs] def get(self, path, app="WDDDMS", verify=None, **kwargs): """HTTP GET verb to Groundwater Data. Args: path (str): final portion of URL path off the end of self.endpoint e.g. to GET ``https://www.waterconnect.sa.gov.au/_layouts/15/dfw.sharepoint.wdd /WDDDMS.ashx/GetAdvancedListsData`` then you would use ``path="GetAdvancedListsData"``. """ if verify is None: verify = self.verify t_remain = self.sleep - (time.time() - self.last_request) if t_remain > 0: logger.debug("Waiting {} sec".format(t_remain)) time.sleep(t_remain) if not path.startswith(self.endpoint): path = self.endpoint + path path = path.format(app=app) logger.debug("GET {} verify={}".format(path, verify)) response = super().get(path, verify=verify, **kwargs) self.last_request = time.time() endpoint, name = path.rsplit("/", 1) logger.debug("Response content = {}".format(response.content)) return self._cache_data(Response(response, endpoint=endpoint, name=name))
[docs] def post(self, path, app="WDDDMS", verify=None, **kwargs): # TODO: Implement _cache_data for CSV bulk data formats... ? if verify is None: verify = self.verify t_remain = self.sleep - (time.time() - self.last_request) if t_remain > 0: logger.debug("Waiting {} sec".format(t_remain)) time.sleep(t_remain) if not path.startswith(self.endpoint): path = self.endpoint + path path = path.format(app=app) logger.debug("POST {} verify={}".format(path, verify)) response = super().post(path, verify=verify, **kwargs) self.last_request = time.time() endpoint, name = path.rsplit("/", 1) return Response(response, endpoint=endpoint, name=name)
def bulk_download_wells(self, service, wells, **kwargs): return bulk_download(service, {"DHNOs": wells.dh_no}, **kwargs) def bulk_download(self, service, json_data, format="CSV"): r = self.post( "{service}?bulkOutput={format}".format(service=service, format=format), data={"exportdata": json.dumps(json_data)}, ) print(r.response.content) with io.BytesIO(r.response.content) as buffer: df = pd.read_csv(buffer) return df def bulk_water_levels(self, wells, **kwargs): dh_nos = [w for w in wells] if len(wells): if hasattr(wells[0], "dh_no"): dh_nos = [w.dh_no for w in wells] df = self.bulk_download("GetWaterLevelDownload", {"DHNOs": dh_nos}) return df def bulk_salinities(self, wells, **kwargs): dh_nos = [w for w in wells] if len(wells): if hasattr(wells[0], "dh_no"): dh_nos = [w.dh_no for w in wells] df = self.bulk_download("GetSalinityDownload", {"DHNOs": dh_nos}) return df def bulk_drillers_logs(self, wells, **kwargs): dh_nos = [w for w in wells] if len(wells): if hasattr(wells[0], "dh_no"): dh_nos = [w.dh_no for w in wells] df = self.bulk_download("GetDrillersLogDownload", {"DHNOs": dh_nos}) return df def _cache_data(self, response): if response.df_exists: rdf = response.df cols_present = list( set(self.well_id_cols.keys()).intersection(set(rdf.columns)) ) rdf2 = rdf[cols_present].rename(columns=self.well_id_cols) self.well_cache = ( pd.concat([self.well_cache, rdf2]) .drop_duplicates() .sort_values("unit_long") ) return response
[docs] def find_wells(self, input_text, **kwargs): """Find wells and retrieve some summary information. Args: input_text (str): any well identifiers to parse. See :func:`sa_gwdata.parse_well_ids_plaintext` for details of other keyword arguments you can pass here. For example: >>> from sa_gwdata import WaterConnectSession >>> with WaterConnectSession() as s: ... wells = s.find_wells("yat99 5840-46 ULE205") ... >>> wells ['MLC008', 'ULE205', 'YAT099'] """ ids = parse_well_ids(input_text, **kwargs) dh_nos = [x for id_type, x in ids if id_type == "dh_no"] unit_nos = [x for id_type, x in ids if id_type == "unit_no"] obs_nos = [x for id_type, x in ids if id_type == "obs_no"] r1 = self.get("GetUnitNumberSearchData", params={"MAPNUM": ",".join(unit_nos)}) r2 = self.get( "GetObswellNumberSearchData", params={"OBSNUMBER": ",".join(obs_nos)} ) df = ( pd.concat([r1.df, r2.df], sort=False) .drop_duplicates() .rename( columns={"dhno": "dh_no", "mapnum": "unit_no", "obsnumber": "obs_no"} ) ) for key in ["obs_no", "name"]: if not key in df: df[key] = "" df.loc[df[key].isnull(), key] = "" return Wells([Well(**r.to_dict()) for _, r in df.iterrows()])
def find_wells_in_lat_lon(self, lats, lons): lons = sorted(lons) lats = sorted(lats) dfs = [] coords = [lats[0], lons[0], lats[1], lons[1]] box = ",".join(["{:.4f}".format(c) for c in coords]) r = self.get( "GetGridData?Box={box}".format(box=box), ) df = r.df.drop_duplicates().rename( columns={"dhno": "dh_no", "mapnum": "unit_no", "obsnumber": "obs_no"} ) for key in ["obs_no", "name", "unit_no"]: if not key in df: df[key] = "" df.loc[df[key].isnull(), key] = "" return Wells([Well(**r.to_dict()) for _, r in df.iterrows()])
[docs] def refresh_available_groupings(self): """Load lists data from API. Stores them in the attributes aquifers, networks, nrm_regions, pwas, pwras. """ response = self.get("GetAdvancedListsData") self.aquifers = { item["V"]: item["T"].replace((item["V"] + ": "), "") for item in response.json["Aquifer"] } self.networks = {item["V"]: item["T"] for item in response.json["Networks"]} self.nrm_regions = { item["V"]: item["T"] + " NRM Region" for item in response.json["NRMRegion"] } self.pwas = { item["V"]: item["V"] + " PWA" for item in response.json["PrescribedArea"] } self.pwras = { item["V"]: item["V"] + " PWRA" for item in response.json["PrescribedWRArea"] }
def data_pwa(self, pwa, swl_status=None, tds_status=None): q = [] if not swl_status is None: q.append("SWLSTATUS='{}'".format("C" if swl_status else "H")) if not tds_status is None: q.append("SALSTATUS='{}'".format("H" if tds_status else "H")) return self.get( "GetPWASearchData?PWA={pwa}&Q={q}".format(pwa=pwa, q="%20AND%20".join(q)) )
# SALINITY - can join by AND # GetPWASearchData?PWA=Angas-Bremer&Q=SALSTATUS='C' # GetPWASearchData?PWA=Angas-Bremer&Q=SWLSTATUS='C'%20AND%20SALSTATUS='C'