import collections.abc
import re
import pandas as pd
PATTERNS = {
"unit_no": [r"G?(\d{4})-?(\d{5})", r"G?(\d{4})-(\d{1,5})"],
"dh_no": [r"(\d{1,6})"],
"obs_no": [r"([a-zA-Z]{3})[ -]?(\d{1,3})"],
}
[docs]class UnitNo:
"""Parse a well unit number.
Arguments:
*args (str or int): either the complete unit number or the map sheet and
drillhole sequence numbers
Example::
>>> u1 = UnitNo("6628-123")
>>> u2 = UnitNo("662800123")
>>> u3 = UnitNo(662800123)
>>> u4 = UnitNo("6628-00123")
>>> u5 = UnitNo(6628, 123)
>>> u6 = UnitNo("6628", "00123")
>>> u7 = UnitNo("G662800123")
>>> u1 == u2 == u3 == u4 == u5 == u6 == u7
True
Attributes:
map (int): 10K map sheet
seq (int): sequence number
hyphen (str): hyphenated format e.g. "6628-123"
long (str): zero-filled format e.g. "662800123"
long_int (int/None): zero-filled format as integer e.g. 662800123 or
None if missing
wilma (str): WILMA style e.g. "6628-00123"
hydstra (str): Hydstra style e.g. "G662800123"
"""
def __init__(self, *args):
self.map = None
self.seq = None
self._attributes = [
"map",
"seq",
"hyphen",
"long",
"long_int",
"wilma",
"hydstra",
]
self.set(*args)
[docs] def set(self, *args):
"""See :class:`UnitNo` constructor for details of arguments."""
if len(args) == 1:
if args[0] == "nan":
args[0] = None
if args[0]:
if isinstance(args[0], list) or isinstance(args[0], tuple):
return self.set(*args[0])
for pattern in PATTERNS["unit_no"]:
match = re.match(pattern, str(args[0]))
if match:
self.map = int(match.group(1))
self.seq = int(match.group(2))
return
raise ValueError(
"no identifier found in {}, "
"check docs for accepted formats".format(args[0])
)
elif len(args) == 2:
self.map = int(args[0])
self.seq = int(args[1])
@property
def hyphen(self):
try:
return "{:d}-{:d}".format(self.map, self.seq)
except TypeError:
return ""
@property
def long(self):
try:
return "{:d}{:05d}".format(self.map, self.seq)
except TypeError:
return ""
@property
def long_int(self):
if self.long:
return int(self.long)
else:
return None
@property
def wilma(self):
try:
return "{:d}-{:05d}".format(self.map, self.seq)
except TypeError:
return ""
@property
def hydstra(self):
try:
return "G{:d}{:05d}".format(self.map, self.seq)
except TypeError:
return ""
def __str__(self):
return self.hyphen
def __eq__(self, other):
return str(self) == str(other)
def __hash__(self):
return hash((self.map, self.seq))
def __iter__(self):
return iter((self.map, self.seq))
def __bool__(self):
return bool(self.map) and bool(self.seq)
def to_scalar_dict(self):
return {attr: getattr(self, attr) for attr in self._attributes}
[docs]class ObsNo:
"""Parse an observation well identifier.
Arguments:
*args (str or int): either one string, which can be either in the format
'YAT017' or 'YAT-17', etc.; or two values, either int or str, for
the plan prefix (three letters referring to the hundred) and
the sequence number. e.g. 'YAT', 17
Example::
>>> from sa_gwdata import ObsNo
>>> o1 = ObsNo("YAT017")
>>> o2 = ObsNo("YAT17")
>>> o3 = ObsNo("YAT 17")
>>> o4 = ObsNo("YAT", 17)
>>> o1 == o2 == o3 == o4
True
Attributes:
plan (str): hundred prefix
seq (int): sequence number
id (str): consistent zero-padded identifier e.g. "YAT017"
egis (str): ENVGIS style e.g. "YAT 17"
"""
def __init__(self, *args):
self.plan = ""
self.seq = None
self._attributes = ["plan", "seq", "id", "egis"]
self.set(*args)
[docs] @classmethod
def parse(cls, *args, **kwargs):
"""Parse an obs identifier, ignoring all parsing errors.
Arguments are the same as those for the class constructor,
but all exceptions are ignored.
Returns: ObsNo.id if successful, a blank string if not.
"""
try:
obs_no = cls(*args, **kwargs)
except:
return ""
else:
return obs_no.id
[docs] def set(self, *args):
"""See :class:`ObsNo` constructor for details of arguments."""
if len(args) == 1:
if args[0] == "nan":
args[0] = None
if args[0]:
if isinstance(args[0], list) or isinstance(args[0], tuple):
return self.set(*args[0])
for pattern in PATTERNS["obs_no"]:
match = re.match(pattern, args[0])
if match:
self.plan = match.group(1)
self.seq = int(match.group(2))
return
raise ValueError(
"no identifier found in {}, "
"check docs for accepted formats".format(args[0])
)
elif len(args) == 2:
if isinstance(args[0], str):
self.plan = args[0]
self.seq = int(args[1])
else:
raise ValueError(
"first argument should be a str e.g. 'YAT', 'ADE', etc."
)
@property
def id(self):
try:
return "{}{:03d}".format(self.plan.upper(), self.seq)
except TypeError:
return ""
@property
def egis(self):
try:
return "{} {:.0f}".format(self.plan.upper(), self.seq)
except TypeError:
return ""
def __str__(self):
return self.id
def __eq__(self, other):
return str(self) == str(other)
def __hash__(self):
return hash((self.plan, self.seq))
def __iter__(self):
return iter((self.plan, self.seq))
def __bool__(self):
return bool(self.plan) and bool(self.seq)
def to_scalar_dict(self):
return {attr: getattr(self, attr) for attr in self._attributes}
[docs]class Well:
"""Represents a well.
Args:
dh_no (int): drillhole number (required)
unit_no (str/int): unit number (optional)
obs_no (str/int): obs number (optional)
Other keyword arguments will be set as attributes.
Attributes:
id (str): obs number if it exists, e.g. "NOA002", if not,
unit number e.g. "6628-123", and in the rare case that
a unit number does not exist, then drillhole no. e.g.
"200135".
title (str): available attributes including name, e.g.
"7025-3985 / WRG038 / WESTERN LAGOON".
obs_no (ObsNo): obs number
unit_no (UnitNo): unit number
"""
def __init__(self, *args, **kwargs):
self._attributes = []
self.unit_no = UnitNo()
self.obs_no = ObsNo()
self.name = ""
self.set(*args, **kwargs)
[docs] def set(self, dh_no, unit_no="", obs_no="", **kwargs):
"""See :class:`Well` constructor for docstring."""
self.dh_no = dh_no
self.set_unit_no(unit_no)
self.set_obs_no(obs_no)
for key, value in kwargs.items():
self.set_well_attribute(key, value)
def set_well_attribute(self, key, value):
key = key.lower()
self._attributes.append(key)
setattr(self, key, value)
[docs] def set_obs_no(self, *args):
"""Set obswell number.
Args are passed to :class:`ObsNo` constructor.
"""
self.obs_no.set(*args)
[docs] def set_unit_no(self, *args):
"""Set unit number.
Args are passed to :class:`UnitNo` constructor.
"""
self.unit_no.set(*args)
def __eq__(self, other):
if hasattr(other, "dh_no"):
return self.dh_no == other.dh_no
else:
return False
def __hash__(self):
return hash(self.dh_no)
def __bool__(self):
return bool(self.dh_no)
@property
def id(self):
if self.obs_no:
return self.obs_no
elif self.unit_no:
return self.unit_no
else:
return str(self.dh_no)
@property
def title(self):
names = [self.unit_no.hyphen]
if not names[0]:
names[0] = "[dh_no={:d}]".format(self.dh_no)
if self.obs_no:
names.append(self.obs_no.id)
if self.name:
names.append(self.name)
return " / ".join(names)
def __repr__(self):
if self.obs_no:
return f"'{str(self.obs_no)}'"
elif self.unit_hyphen:
return f"'{str(self.unit_hyphen)}'"
else:
return str(self.dh_no)
[docs] def to_scalar_dict(self):
"""Convert Well to a dictionary containing scalar values.
Returns: dict.
Guaranteed keys are "dh_no", "id", "title" and "name".
The keys present in `well.unit_no.to_scalar_dict()` will
be added with the prefix "unit_no.". Same for `obs_no`.
Any additional attributes will also be present.
"""
d = {"dh_no": self.dh_no, "id": self.id, "title": self.title, "name": self.name}
d.update(
{("unit_no." + k): v for k, v in self.unit_no.to_scalar_dict().items()}
)
d.update({("obs_no." + k): v for k, v in self.obs_no.to_scalar_dict().items()})
d.update({attr: getattr(self, attr) for attr in self._attributes})
return d
[docs] def path_safe_repr(self, remove_prefix=True):
"""Return title containing only characters which are allowed in
Windows path names."""
r = str(self)
for char in ["\\", "/", "?", ":", "*", '"', "<", ">", "|"]:
r = r.replace(char, "")
# This keyword argument now has no function.
# if remove_prefix:
# parts = r.split(")")
# r = " ".join(parts[1:])[1:]
return r
class Wells(collections.abc.MutableSequence):
"""Represents a set of wells.
This is not meant to be instantiated here, but can be
accessed from methods of other objects, such as
:meth:`sa_gwdata.WaterConnectSession.find_wells`.
Attributes:
wells (list): list of :class:`sa_gwdata.Well` objects.
All attributes of the contained Well objects will also be
present as attributes on this object, returning lists of the
values from the Well objects contained here. It sounds more
complex than it is! Tab completion is enabled, so try it out
in IPython and you will quickly see how it works.
"""
def __init__(self, wells=None):
if wells is None:
wells = []
self.wells = wells
self._refresh()
def __repr__(self):
return repr(self.wells)
def __len__(self):
return len(self.wells)
def __getitem__(self, ix):
if isinstance(ix, int):
if ix < len(self):
return self.wells[ix]
key = ix
if not key in self._map:
for id_type, value in parse_well_ids_plaintext(str(key)):
if value in self._map:
key = value
break
return self._map[key]
def __delitem__(self, ix):
del self.wells[ix]
self._refresh()
def __setitem__(self, ix, value):
self.wells[ix] = value
def insert(self, ix, value):
self.wells.insert(ix, value)
self._refresh()
def append(self, value):
self.wells.append(value)
self._refresh()
def count(self, item):
return self.wells.count(item)
def index(self, *args):
return self.wells.index(*args)
def __iter__(self):
return iter(self.wells)
def __getattr__(self, name):
name = name.split(".")[0]
if name in self._attributes:
return self.df()[name].values.tolist()
elif name in ["unit_no", "obs_no"]:
return [getattr(w, name) for w in self]
else:
raise AttributeError(
"Wells object does not have an attribute named '{}'".format(name)
)
def _refresh(self):
if len(self):
self._attributes = list(self[0].to_scalar_dict().keys())
else:
self._attributes = []
self._map = {w.dh_no: w for w in self}
self._map.update({w.obs_no.id: w for w in self if w.obs_no.id})
self._map.update({w.unit_no.hyphen: w for w in self if w.unit_no.hyphen})
def __dir__(self):
return sorted(
list(set([k.split(".")[0] for k in self._attributes])) + super().__dir__()
)
def df(self):
"""Return information contained in each Well as a table.
Returns: pd.DataFrame
The columns of the returned DataFrame will always contain
the "dh_no", "id", "title" attributes from the contained
Well objects.
Additional columns in the form "unit_no." + key will exist
for all the keys in :meth:`UnitNo.to_scalar_dict`. Same for
:meth:`ObsNo.to_scalar_dict`.
Remaining columns depend on the additional attributes present
on the contained Well objects.
"""
df = pd.DataFrame([w.to_scalar_dict() for w in self])
return df
def parse_well_ids(input_text, **kwargs):
"""Specify well identifiers in free text and have them parsed.
Args:
input_text (str): the text to parse
Other keyword arguments are passed to :func:`parse_well_ids_plaintext`.
Example of acceptable formats:
662800125
6628-125
G662800125
6628-00125
SLE 15
SLE015
SLE15
"""
input_text = input_text.replace("\r", "")
return parse_well_ids_plaintext(input_text, **kwargs)
[docs]def parse_well_ids_plaintext(
input_text,
types=("unit_no", "obs_no"),
unit_no_prefix="",
obs_no_prefix="",
dh_re_prefix=r"\A",
):
"""Parse possible well identifiers out of plain text.
Arguments:
input_text (str): the text to parse well identifiers from.
Can include multiple lines.
types (tuple): types of identifiers to look for. Currently
supported: "unit_no", "obs_no", "dh_no"
dh_re_prefix (str): regexp pattern required before a dh_no
regexp will match
Returns: a list of tuples e.g.
>>> from sa_gwdata import parse_well_ids
>>> parse_well_ids('sle15')
[('obs_no', 'SLE015')]
>>> parse_well_ids('6628150')
[]
>>> parse_well_ids('6628-150')
[('unit_no', '6628-150')]
>>> parse_well_ids('662800150')
[('unit_no', '6628-150')]
>>> parse_well_ids('259001', types=["dh_no"])
[('dh_no', '259001')]
Remember this doesn't actually check whether these identifiers to a well
in the real world; it just parses a string of text to find possible
well identifiers. It's pretty robust:
>>> parse_well_ids("SLE 15, SLE16, and also maybe 5910-1")
[('unit_no', '5910-1'), ('obs_no', 'SLE015'), ('obs_no', 'SLE016'), ('obs_no', 'YBE591')]
It has unfortunately matched "ybe 591" from the phrase "maybe 5910-1" as an
obs_no.
"""
# WARNING: make sure you update any keyword arguments in WaterConnectSession.find_wells()
input_text = " " + input_text + " "
match_counts = {"unit_no": 0, "dh_no": 0, "obs_no": 0}
well_ids = []
if "unit_no" in types:
for pattern in PATTERNS["unit_no"]:
matches = re.findall(unit_no_prefix + pattern, input_text)
for match in matches:
match_counts["unit_no"] += 1
well_ids.append(
("unit_no", "{}-{:.0f}".format(match[0], int(match[1])))
)
if "dh_no" in types:
for id_type in ("dh_no",):
for pattern in PATTERNS[id_type]:
items = input_text.split()
for item in items:
match = re.search(dh_re_prefix + pattern, item)
if match:
match_counts[id_type] += 1
well_ids.append((id_type, match.group()))
if "obs_no" in types:
for pattern in PATTERNS["obs_no"]:
matches = re.findall(obs_no_prefix + pattern, input_text)
for match in matches:
match_counts["obs_no"] += 1
well_ids.append(
("obs_no", "{}{:03.0f}".format(match[0].upper(), int(match[1])))
)
return well_ids