"""
Module: libfmp.c7.c7s2_audio_matching
Author: Meinard Mueller, Frank Zalkow
License: The MIT license, https://opensource.org/licenses/MIT
This file is part of the FMP Notebooks (https://www.audiolabs-erlangen.de/FMP)
"""
import numpy as np
from matplotlib import patches
from numba import jit
import scipy
import librosa
import libfmp.c3
import libfmp.c7
[docs]def quantize_matrix(C, quant_fct=None):
"""Quantize matrix values in a logarithmic manner (as done for CENS features)
Notebook: C7/C7S2_CENS.ipynb
Args:
C (np.ndarray): Input matrix
quant_fct (list): List specifying the quantization function (Default value = None)
Returns:
C_quant (np.ndarray): Output matrix
"""
C_quant = np.empty_like(C)
if quant_fct is None:
quant_fct = [(0.0, 0.05, 0), (0.05, 0.1, 1), (0.1, 0.2, 2), (0.2, 0.4, 3), (0.4, 1, 4)]
for min_val, max_val, target_val in quant_fct:
mask = np.logical_and(min_val <= C, C < max_val)
C_quant[mask] = target_val
return C_quant
[docs]def compute_cens_from_chromagram(C, Fs=1, ell=41, d=10, quant=True):
"""Compute CENS features from chromagram
Notebook: C7/C7S2_CENS.ipynb
Args:
C (np.ndarray): Input chromagram
Fs (scalar): Feature rate of chromagram (Default value = 1)
ell (int): Smoothing length (Default value = 41)
d (int): Downsampling factor (Default value = 10)
quant (bool): Apply quantization (Default value = True)
Returns:
C_CENS (np.ndarray): CENS features
Fs_CENS (scalar): Feature rate of CENS features
"""
C_norm = libfmp.c3.normalize_feature_sequence(C, norm='1')
C_Q = quantize_matrix(C_norm) if quant else C_norm
C_smooth, Fs_CENS = libfmp.c3.smooth_downsample_feature_sequence(C_Q, Fs, filt_len=ell,
down_sampling=d, w_type='hann')
C_CENS = libfmp.c3.normalize_feature_sequence(C_smooth, norm='2')
return C_CENS, Fs_CENS
[docs]def scale_tempo_sequence(X, factor=1):
"""Scales a sequence (given as feature matrix) along time (second dimension)
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
X (np.ndarray): Feature sequences (given as K x N matrix)
factor (float): Scaling factor (resulting in length "round(factor * N)"") (Default value = 1)
Returns:
X_new (np.ndarray): Scaled feature sequence
N_new (int): Length of scaled feature sequence
"""
N = X.shape[1]
t = np.linspace(0, 1, num=N, endpoint=True)
N_new = np.round(factor * N).astype(int)
t_new = np.linspace(0, 1, num=N_new, endpoint=True)
X_new = scipy.interpolate.interp1d(t, X, axis=1)(t_new)
return X_new, N_new
[docs]def cost_matrix_dot(X, Y):
"""Computes cost matrix via dot product
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
X (np.ndarray): First sequence (K x N matrix)
Y (np.ndarray): Second sequence (K x M matrix)
Returns:
C (np.ndarray): Cost matrix
"""
return 1 - np.dot(X.T, Y)
[docs]def matching_function_diag(C, cyclic=False):
"""Computes diagonal matching function
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
C (np.ndarray): Cost matrix
cyclic (bool): If "True" then matching is done cyclically (Default value = False)
Returns:
Delta (np.ndarray): Matching function
"""
N, M = C.shape
assert N <= M, "N <= M is required"
Delta = C[0, :]
for n in range(1, N):
Delta = Delta + np.roll(C[n, :], -n)
Delta = Delta / N
if cyclic is False:
Delta[M-N+1:M] = np.inf
return Delta
[docs]def mininma_from_matching_function(Delta, rho=2, tau=0.2, num=None):
"""Derives local minima positions of matching function in an iterative fashion
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
Delta (np.ndarray): Matching function
rho (int): Parameter to exclude neighborhood of a matching position for subsequent matches (Default value = 2)
tau (float): Threshold for maximum Delta value allowed for matches (Default value = 0.2)
num (int): Maximum number of matches (Default value = None)
Returns:
pos (np.ndarray): Array of local minima
"""
Delta_tmp = Delta.copy()
M = len(Delta)
pos = []
num_pos = 0
rho = int(rho)
if num is None:
num = M
while num_pos < num and np.sum(Delta_tmp < tau) > 0:
m = np.argmin(Delta_tmp)
pos.append(m)
num_pos += 1
Delta_tmp[max(0, m - rho):min(m + rho, M)] = np.inf
pos = np.array(pos).astype(int)
return pos
[docs]def matches_diag(pos, Delta_N):
"""Derives matches from positions in the case of diagonal matching
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
pos (np.ndarray or list): Starting positions of matches
Delta_N (int or np.ndarray or list): Length of match (a single number or a list of same length as Delta)
Returns:
matches (np.ndarray): Array containing matches (start, end)
"""
matches = np.zeros((len(pos), 2)).astype(int)
for k in range(len(pos)):
s = pos[k]
matches[k, 0] = s
if isinstance(Delta_N, int):
matches[k, 1] = s + Delta_N - 1
else:
matches[k, 1] = s + Delta_N[s] - 1
return matches
[docs]def plot_matches(ax, matches, Delta, Fs=1, alpha=0.2, color='r', s_marker='o', t_marker=''):
"""Plots matches into existing axis
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
ax: Axis
matches: Array of matches (start, end)
Delta: Matching function
Fs: Feature rate (Default value = 1)
alpha: Transparency pramaeter for match visualization (Default value = 0.2)
color: Color used to indicated matches (Default value = 'r')
s_marker: Marker used to indicate start of matches (Default value = 'o')
t_marker: Marker used to indicate end of matches (Default value = '')
"""
y_min, y_max = ax.get_ylim()
for (s, t) in matches:
ax.plot(s/Fs, Delta[s], color=color, marker=s_marker, linestyle='None')
ax.plot(t/Fs, Delta[t], color=color, marker=t_marker, linestyle='None')
rect = patches.Rectangle(((s-0.5)/Fs, y_min), (t-s+1)/Fs, y_max, facecolor=color, alpha=alpha)
ax.add_patch(rect)
[docs]def matching_function_diag_multiple(X, Y, tempo_rel_set=[1], cyclic=False):
"""Computes diagonal matching function using multiple query strategy
Notebook: C7/C7S2_DiagonalMatching.ipynb
Args:
X (np.ndarray): First sequence (K x N matrix)
Y (np.ndarray): Second sequence (K x M matrix)
tempo_rel_set (np.ndarray): Set of relative tempo values (scaling) (Default value = [1])
cyclic (bool): If "True" then matching is done cyclically (Default value = False)
Returns:
Delta_min (np.ndarray): Matching function (obtained by from minimizing over several matching functions)
Delta_N (np.ndarray): Query length of best match for each time position
Delta_scale (np.ndarray): Set of matching functions (for each of the scaled versions of the query)
"""
M = Y.shape[1]
num_tempo = len(tempo_rel_set)
Delta_scale = np.zeros((num_tempo, M))
N_scale = np.zeros(num_tempo)
for k in range(num_tempo):
X_scale, N_scale[k] = scale_tempo_sequence(X, factor=tempo_rel_set[k])
C_scale = cost_matrix_dot(X_scale, Y)
Delta_scale[k, :] = matching_function_diag(C_scale, cyclic=cyclic)
Delta_min = np.min(Delta_scale, axis=0)
Delta_argmin = np.argmin(Delta_scale, axis=0)
Delta_N = N_scale[Delta_argmin]
return Delta_min, Delta_N, Delta_scale
[docs]@jit(nopython=True)
def compute_accumulated_cost_matrix_subsequence_dtw(C):
"""Given the cost matrix, compute the accumulated cost matrix for
subsequence dynamic time warping with step sizes {(1, 0), (0, 1), (1, 1)}
Notebook: C7/C7S2_SubsequenceDTW.ipynb
Args:
C (np.ndarray): Cost matrix
Returns:
D (np.ndarray): Accumulated cost matrix
"""
N, M = C.shape
D = np.zeros((N, M))
D[:, 0] = np.cumsum(C[:, 0])
D[0, :] = C[0, :]
for n in range(1, N):
for m in range(1, M):
D[n, m] = C[n, m] + min(D[n-1, m], D[n, m-1], D[n-1, m-1])
return D
[docs]@jit(nopython=True)
def compute_optimal_warping_path_subsequence_dtw(D, m=-1):
"""Given an accumulated cost matrix, compute the warping path for
subsequence dynamic time warping with step sizes {(1, 0), (0, 1), (1, 1)}
Notebook: C7/C7S2_SubsequenceDTW.ipynb
Args:
D (np.ndarray): Accumulated cost matrix
m (int): Index to start back tracking; if set to -1, optimal m is used (Default value = -1)
Returns:
P (np.ndarray): Optimal warping path (array of index pairs)
"""
N, M = D.shape
n = N - 1
if m < 0:
m = D[N - 1, :].argmin()
P = [(n, m)]
while n > 0:
if m == 0:
cell = (n - 1, 0)
else:
val = min(D[n-1, m-1], D[n-1, m], D[n, m-1])
if val == D[n-1, m-1]:
cell = (n-1, m-1)
elif val == D[n-1, m]:
cell = (n-1, m)
else:
cell = (n, m-1)
P.append(cell)
n, m = cell
P.reverse()
P = np.array(P)
return P
[docs]@jit(nopython=True)
def compute_accumulated_cost_matrix_subsequence_dtw_21(C):
"""Given the cost matrix, compute the accumulated cost matrix for
subsequence dynamic time warping with step sizes {(1, 1), (2, 1), (1, 2)}
Notebook: C7/C7S2_SubsequenceDTW.ipynb
Args:
C (np.ndarray): Cost matrix
Returns:
D (np.ndarray): Accumulated cost matrix
"""
N, M = C.shape
D = np.zeros((N + 1, M + 2))
D[0:1, :] = np.inf
D[:, 0:2] = np.inf
D[1, 2:] = C[0, :]
for n in range(1, N):
for m in range(0, M):
if n == 0 and m == 0:
continue
D[n+1, m+2] = C[n, m] + min(D[n-1+1, m-1+2], D[n-2+1, m-1+2], D[n-1+1, m-2+2])
D = D[1:, 2:]
return D
[docs]@jit(nopython=True)
def compute_optimal_warping_path_subsequence_dtw_21(D, m=-1):
"""Given an accumulated cost matrix, compute the warping path for
subsequence dynamic time warping with step sizes {(1, 1), (2, 1), (1, 2)}
Notebook: C7/C7S2_SubsequenceDTW.ipynb
Args:
D (np.ndarray): Accumulated cost matrix
m (int): Index to start back tracking; if set to -1, optimal m is used (Default value = -1)
Returns:
P (np.ndarray): Optimal warping path (array of index pairs)
"""
N, M = D.shape
n = N - 1
if m < 0:
m = D[N - 1, :].argmin()
P = [(n, m)]
while n > 0:
if m == 0:
cell = (n-1, 0)
else:
val = min(D[n-1, m-1], D[n-2, m-1], D[n-1, m-2])
if val == D[n-1, m-1]:
cell = (n-1, m-1)
elif val == D[n-2, m-1]:
cell = (n-2, m-1)
else:
cell = (n-1, m-2)
P.append(cell)
n, m = cell
P.reverse()
P = np.array(P)
return P
[docs]def compute_cens_from_file(fn_wav, Fs=22050, N=4410, H=2205, ell=21, d=5):
"""Compute CENS features from file
Notebook: C7/C7S2_AudioMatching.ipynb
Args:
fn_wav (str): Filename of wav file
Fs (scalar): Feature rate of wav file (Default value = 22050)
N (int): Window size for STFT (Default value = 4410)
H (int): Hop size for STFT (Default value = 2205)
ell (int): Smoothing length (Default value = 21)
d (int): Downsampling factor (Default value = 5)
Returns:
X_CENS (np.ndarray): CENS features
L (int): Length of CENS feature sequence
Fs_CENS (scalar): Feature rate of CENS features
x_duration (float): Duration (seconds) of wav file
"""
x, Fs = librosa.load(fn_wav, sr=Fs)
x_duration = x.shape[0] / Fs
X_chroma = librosa.feature.chroma_stft(y=x, sr=Fs, tuning=0, norm=None, hop_length=H, n_fft=N)
X_CENS, Fs_CENS = libfmp.c7.compute_cens_from_chromagram(X_chroma, Fs=Fs/H, ell=ell, d=d)
L = X_CENS.shape[1]
return X_CENS, L, Fs_CENS, x_duration
[docs]def compute_matching_function_dtw(X, Y, stepsize=2):
"""Compute CENS features from file
Notebook: C7/C7S2_AudioMatching.ipynb
Args:
X (np.ndarray): Query feature sequence (given as K x N matrix)
Y (np.ndarray): Database feature sequence (given as K x M matrix)
stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2)
Returns:
Delta (np.ndarray): DTW-based matching function
C (np.ndarray): Cost matrix
D (np.ndarray): Accumulated cost matrix
"""
C = libfmp.c7.cost_matrix_dot(X, Y)
if stepsize == 1:
D = libfmp.c7.compute_accumulated_cost_matrix_subsequence_dtw(C)
if stepsize == 2:
D = libfmp.c7.compute_accumulated_cost_matrix_subsequence_dtw_21(C)
N, M = C.shape
Delta = D[-1, :] / N
return Delta, C, D
[docs]def matches_dtw(pos, D, stepsize=2):
"""Derives matches from positions for DTW-based strategy
Notebook: C7/C7S2_AudioMatching.ipynb
Args:
pos (np.ndarray): End positions of matches
D (np.ndarray): Accumulated cost matrix
stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2)
Returns:
matches (np.ndarray): Array containing matches (start, end)
"""
matches = np.zeros((len(pos), 2)).astype(int)
for k in range(len(pos)):
t = pos[k]
matches[k, 1] = t
if stepsize == 1:
P = libfmp.c7.compute_optimal_warping_path_subsequence_dtw(D, m=t)
if stepsize == 2:
P = libfmp.c7.compute_optimal_warping_path_subsequence_dtw_21(D, m=t)
s = P[0, 1]
matches[k, 0] = s
return matches
[docs]def compute_matching_function_dtw_ti(X, Y, cyc=np.arange(12), stepsize=2):
"""Compute transposition-invariant matching function
Notebook: C7/C7S2_AudioMatching.ipynb
Args:
X (np.ndarray): Query feature sequence (given as K x N matrix)
Y (np.ndarray): Database feature sequence (given as K x M matrix)
cyc (np.nda(rray): Set of cyclic shift indices to be considered (Default value = np.arange(12))
stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2)
Returns:
Delta_TI (np.ndarray): Transposition-invariant matching function
Delta_ind (np.ndarray): Cost-minimizing indices
Delta_cyc (np.ndarray): Array containing all matching functions
"""
M = Y.shape[1]
num_cyc = len(cyc)
Delta_cyc = np.zeros((num_cyc, M))
for k in range(num_cyc):
X_cyc = np.roll(X, k, axis=0)
Delta_cyc[k, :], C, D = compute_matching_function_dtw(X_cyc, Y, stepsize=stepsize)
Delta_TI = np.min(Delta_cyc, axis=0)
Delta_ind = np.argmin(Delta_cyc, axis=0)
return Delta_TI, Delta_ind, Delta_cyc