# coding=utf-8
"""
This module facilitates access to velocity model data.
.. todo::
Make a ScalarField class to abstract behaviour in this class.
.. autoclass:: VelocityModel
:special-members:
:private-members:
:members:
"""
import numpy as np
import pandas as pd
from . import constants as _constants
from . import coords as _coords
from . import geometry as _geometry
[docs]class VelocityModel(object):
"""
A callable class providing a queryable container for seismic
velocities in a 3D volume.
:param str inf: path to input file containing phase velocity
data
:param str fmt: format of input file
"""
[docs] def __init__(self, inf=None, fmt=None, topo=None, **kwargs):
if inf is None:
return
if topo is None:
self.topo = lambda _, __: _constants.EARTH_RADIUS
else:
self.topo = topo
if fmt.upper() == "FANG":
self._read_fang(inf, **kwargs)
elif fmt.upper() in ("FM3D", "FMM3D"):
raise(NotImplementedError(f"Unrecognized format - {fmt}"))
self._read_fmm3d(inf, **kwargs)
elif fmt.upper() in ("UCVM", "SCEC-UCVM"):
self._read_ucvm(inf, **kwargs)
elif fmt.upper() == "NPZ":
self._read_npz(inf)
else:
raise(ValueError(f"Unrecognized format - {fmt}"))
[docs] def from_DataFrame(self, df):
"""
Initialize VelocityModel from a pandas.DataFrame. Input
DataFrame must have *lat*, *lon*, *depth*, *Vp*, and *Vs*,
fields.
:param pandas.DataFrame df: DataFrame with velocity data
"""
df["R"] = df["T"] = df["P"] = np.nan
spher = _coords.as_geographic(df[["lat", "lon", "depth"]]
).to_spherical()
df.loc[:, ["R", "T", "P"]] = spher
df = df.sort_values(["R", "T", "P"])
nR = len(df.drop_duplicates("R"))
nT = len(df.drop_duplicates("T"))
nP = len(df.drop_duplicates("P"))
nodes = df[["R", "T", "P"]].values.reshape(nR, nT, nP, 3)
self._nodes = _coords.as_spherical(nodes)
Vp = df["Vp"].values.reshape(nR, nT, nP)
Vs = df["Vs"].values.reshape(nR, nT, nP)
self._Vp = Vp
self._Vs = Vs
return(self)
def to_DataFrame(self):
df = pd.DataFrame().from_dict({"R": self._nodes[...,0].flatten(),
"T": self._nodes[...,1].flatten(),
"P": self._nodes[...,2].flatten(),
"Vp": self._Vp.flatten(),
"Vs": self._Vs.flatten()})
df["lat"] = df["lon"] = df["depth"] = np.nan
geo = _coords.as_spherical(df[["R", "T", "P"]]).to_geographic()
df.loc[:, ["lat", "lon", "depth"]] = geo
df = df.sort_values(["lat", "lon", "depth"]).reset_index()
return(df[["lat", "lon", "depth", "Vp", "Vs", "R", "T", "P"]])
[docs] def __call__(self, phase, lat, lon, depth):
"""
Return **phase**-velocity at given coordinates. A NULL value
(-1) is returned for points above the surface.
:param str phase: phase
:param float lat: latitude
:param float lon: longitude
:param float depth: depth
:returns: **phase**-velocity at (**lat**, **lon**, **depth**)
:rtype: float
"""
rho, theta, phi = _coords.as_geographic([lat, lon, depth]).to_spherical()
return(self._get_V(phase, rho, theta, phi))
def save(self, outf):
np.savez(outf, nodes=self._nodes, Vp=self._Vp, Vs=self._Vs)
def _read_npz(self, inf):
inf = np.load(inf)
self._nodes = _coords.as_spherical(inf["nodes"])
self._Vp = inf["Vp"]
self._Vs = inf["Vs"]
def _read_ucvm(self, inf, Vp_key="cmb_vp", Vs_key="cmb_vs"):
names=["lon", "lat", "Z", "surf", "vs30", "crustal", "cr_vp", "cr_vs",
"cr_rho", "gtl", "gtl_vp", "gtl_vs", "gtl_rho", "cmb_algo",
"cmb_vp", "cmb_vs", "cmb_rho"]
df = pd.read_table(inf,
delim_whitespace=True,
header=None,
names=names)
df["depth"] = df["Z"]*1e-3
df["Vp"] = df[Vp_key]*1e-3
df["Vs"] = df[Vs_key]*1e-3
df["R"] = df["T"] = df["P"] = np.nan
spher = _coords.as_geographic(df[["lat", "lon", "depth"]]
).to_spherical()
df.loc[:, ["R", "T", "P"]] = spher
df = df.sort_values(["R", "T", "P"])
nR = len(df.drop_duplicates("R"))
nT = len(df.drop_duplicates("T"))
nP = len(df.drop_duplicates("P"))
nodes = df[["R", "T", "P"]].values.reshape(nR, nT, nP, 3)
self._nodes = _coords.as_spherical(nodes)
Vp = df["Vp"].values.reshape(nR, nT, nP)
Vs = df["Vs"].values.reshape(nR, nT, nP)
self._Vp = Vp
self._Vs = Vs
def _read_fang(self, inf):
with open(inf) as inf:
lon = np.array([float(v) for v in inf.readline().split()])
lat = np.array([float(v) for v in inf.readline().split()])
depth = np.array([float(v) for v in inf.readline().split()])
LAT, LON, DEPTH = np.meshgrid(lat, lon, depth, indexing="ij")
VVp = np.zeros(LAT.shape)
VVs = np.zeros(LAT.shape)
for idepth in range(len(depth)):
for ilat in range(len(lat)):
VVp[ilat, :, idepth] = np.array([float(v) for v in inf.readline().split()])
for idepth in range(len(depth)):
for ilat in range(len(lat)):
VVs[ilat, :, idepth] = np.array([float(v) for v in inf.readline().split()])
spher = _coords.as_geographic(np.stack([LAT.flatten(),
LON.flatten(),
DEPTH.flatten()],
axis=1)
).to_spherical()
df = pd.DataFrame.from_dict({"R": spher[:,0],
"T": spher[:,1],
"P": spher[:,2],
"Vp": VVp.flatten(),
"Vs": VVs.flatten()})
df = df.sort_values(["R", "T", "P"])
nR = len(df.drop_duplicates("R"))
nT = len(df.drop_duplicates("T"))
nP = len(df.drop_duplicates("P"))
self._nodes = df[["R", "T", "P"]].values.reshape(nR, nT, nP, 3)
Vp = df["Vp"].values.reshape(nR, nT, nP)
Vs = df["Vs"].values.reshape(nR, nT, nP)
self._Vp = Vp
self._Vs = Vs
def _get_V(self, phase: str, rho: float, theta: float, phi: float)->float:
phase = _verify_phase(phase)
if phase == "P":
VV = self._Vp
elif phase == "S":
VV = self._Vs
else:
raise(ValueError(f"Unrecognized phase type: {phase}"))
idx = np.nonzero(self._nodes[:,0,0,0] == rho)[0]
if idx.size > 0:
iR0, iR1 = idx[0], idx[0]
else:
idxl = np.nonzero(self._nodes[:,0,0,0] < rho)[0]
idxr = np.nonzero(self._nodes[:,0,0,0] > rho)[0]
if not np.any(idxl):
iR0, iR1 = 0, 0
elif not np.any(idxr):
iR0, iR1 = -1, -1
else:
iR0, iR1 = idxl[-1], idxr[0]
if iR0 == iR1:
dR, drho = 1, 0
else:
dR = self._nodes[iR1,0,0,0]-self._nodes[iR0,0,0,0]
drho = (rho - self._nodes[iR0,0,0,0])
idx = np.nonzero(self._nodes[0,:,0,1] == theta)[0]
if idx.size > 0:
iT0, iT1 = idx[0], idx[0]
else:
idxl = np.nonzero(self._nodes[0,:,0,1] < theta)[0]
idxr = np.nonzero(self._nodes[0,:,0,1] > theta)[0]
if not np.any(idxl):
iT0, iT1 = 0, 0
elif not np.any(idxr):
iT0, iT1 = -1, -1
else:
iT0, iT1 = idxl[-1], idxr[0]
if iT0 == iT1:
dT, dtheta = 1, 0
else:
dT = self._nodes[0,iT1,0,1]-self._nodes[0,iT0,0,1]
dtheta = (theta - self._nodes[0,iT0,0,1])
idx = np.nonzero(self._nodes[0,0,:,2] == phi)[0]
if idx.size > 0:
iP0, iP1 = idx[0], idx[0]
else:
idxl = np.nonzero(self._nodes[0,0,:,2] < phi)[0]
idxr = np.nonzero(self._nodes[0,0,:,2] > phi)[0]
if not np.any(idxl):
iP0, iP1 = 0, 0
elif not np.any(idxr):
iP0, iP1 = -1, -1
else:
iP0, iP1 = idxl[-1], idxr[0]
if iP0 == iP1:
dP, dphi = 1, 0
else:
dP = self._nodes[0,0,iP1,2]-self._nodes[0,0,iP0,2]
dphi = (phi - self._nodes[0,0,iP0,2])
V000 = VV[iR0,iT0,iP0]
V001 = VV[iR0,iT0,iP1]
V010 = VV[iR0,iT1,iP0]
V011 = VV[iR0,iT1,iP1]
V100 = VV[iR1,iT0,iP0]
V101 = VV[iR1,iT0,iP1]
V110 = VV[iR1,iT1,iP0]
V111 = VV[iR1,iT1,iP1]
V00 = V000 + (V100 - V000)*drho/dR
V01 = V001 + (V101 - V001)*drho/dR
V10 = V010 + (V110 - V010)*drho/dR
V11 = V011 + (V111 - V011)*drho/dR
V0 = V00 + (V10 - V00)*dtheta/dT
V1 = V01 + (V11 - V01)*dtheta/dT
V = V0 + (V1 - V0)*dphi/dP
return(V)
def regrid(self, R, T, P):
Vp = np.empty(shape=R.shape)
Vs = np.empty(shape=R.shape)
for store, phase, index in ((Vp, "Vp", 0), (Vs, "Vs", 1)):
for (ir, it, ip) in [(ir, it, ip) for ir in range(R.shape[0])
for it in range(T.shape[1])
for ip in range(P.shape[2])]:
r, theta, phi = R[ir, it, ip], T[ir, it, ip], P[ir, it, ip]
store[ir, it, ip] = self._get_V(r, theta, phi, phase)
self.values["Vp"] = Vp
self.values["Vs"] = Vs
self.nodes["r"], self.nodes["theta"], self.nodes["phi"] = R, T, P
self.nodes["nr"] = R.shape[0]
self.nodes["ntheta"] = T.shape[1]
self.nodes["nphi"] = P.shape[2]
self.nodes["dr"] = (R[:,0,0][-1] - R[:,0,0][0]) / (R.shape[0] - 1)
self.nodes["dtheta"] = (T[0,:,0][-1] - T[0,:,0][0]) / (T.shape[1] - 1)
self.nodes["dphi"] = (P[0,0,:][-1] - P[0,0,:][0]) / (P.shape[2] - 1)
self.nodes["r_min"], self.nodes["r_max"] = np.min(R), np.max(R)
self.nodes["theta_min"], self.nodes["theta_max"] = np.min(T), np.max(T)
self.nodes["phi_min"], self.nodes["phi_max"] = np.min(P), np.max(P)
def regularize(self, nr, ntheta, nphi):
R, T, P = np.meshgrid(np.linspace(self.nodes["r_min"],
self.nodes["r_max"],
nr),
np.linspace(self.nodes["theta_min"],
self.nodes["theta_max"],
ntheta),
np.linspace(self.nodes["phi_min"],
self.nodes["phi_max"],
nphi),
indexing="ij")
self.regrid(R, T, P)
[docs] def slice(self, phase, lat0, lon0, azimuth, length, dmin, dmax, nx, nd):
"""
Return a vertical slice from velocity model with
section-parallel offset and depth coordinates.
:param str phase: phase velocity to retrieve
:param float lat0: latitude of center point of section trace
**{Units:** *degrees*, **Range:** *[-90, 90]*\
**}**
:param float lon0: longitude of center point of section trace
**{Units:** *degrees*, **Range:** *[-180,
180]*\ **}**
:param float azimuth: azimuth of section trace **{Units:**
*degrees*, **Range:** *(-inf, inf)*\ **}**
:param float length: length of section trace **{Units:**
*degrees*\ **}**
:param float dmin: minimum depth of section **{Units:** *km*\
**}**
:param float dmax: maximum depth of section **{Units:** *km*\
**}**
:param int nx: number of nodes in section-parallel direction
:param int nd: number of nodes in depth
:returns: phase velocity values and coordinates of vertical
section
:rtype: (`numpy.ndarray`, `numpy.ndarray`, `numpy.ndarray`)
.. code-block:: python
import matplotlib.pyplot as plt
topo = seispy.topography.Topography("data/anza.xyz")
vm = seispy.velocity.VelocityModel("data/vmodel.dat",
"FANG",
topo=topo)
X, Y, V = vm.slice("P", 33.5, -116.0, 302, 150/111, -5, 25, 100, 100)
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
im = ax.pcolormesh(X, Y, V, cmap=plt.get_cmap("hsv"))
ax.set_xlabel("Section-parallel offset [degrees]")
ax.set_ylabel("Depth [km]")
ax.invert_yaxis()
cbar = ax.get_figure().colorbar(im, ax=ax)
cbar.set_label("Vp [km/s]")
plt.show()
.. image:: VelocityModel.png
"""
(lon1, lat1), (lon2, lat2) = _geometry.get_line_endpoints(lat0,
lon0,
azimuth,
length)
DEPTH = np.linspace(dmin, dmax, nd)
LAT = np.linspace(lat1, lat2, nx)
LON = np.linspace(lon1, lon2, nx)
V = np.empty(shape=(len(DEPTH), len(LAT)))
X = np.empty(shape=V.shape)
Y = np.empty(shape=V.shape)
for i in range(nx):
for j in range(nd):
lat, lon, depth = LAT[i], LON[i], DEPTH[j]
V[i, j] = self(phase, lat, lon, depth)
X[i, j] = _geometry.distance((lat1, lon1), (lat, lon))
Y[i, j] = depth
V = np.ma.masked_equal(V, -1)
return(X, Y, V)
def _verify_phase(phase: str)->str:
if phase.upper() == "P" or phase.upper() == "VP":
phase = "P"
elif phase.upper() == "S" or phase.upper() == "VS":
phase = "S"
else:
raise(ValueError("invalid phase type - {}".format(phase)))
return(phase)
def test():
#vm = VelocityModel("/Users/malcolcw/Projects/Wavefront/examples/example2/vgrids.in", "fmm3d")
#grid = vm.v_type_grids[1][1]["grid"]
#print(vm(1, 3, 0.5, 0.5, 0))
#vm.v_type_grids[1][1]
#print(v("Vp", 33.0, -116.9, 3.0))
vm = VelocityModel("/Users/malcolcw/Projects/Shared/Velocity/FANG2016/original/VpVs.dat", "fang")
with open("/Users/malcolcw/Projects/Wavefront/pywrap/example5/vgrids.in", "w") as outf:
outf.write(str(vm))
if __name__ == "__main__":
test()
print("velocity.py is not an executable script.")
exit()