Source code for libfmp.c7.c7s2_audio_matching

"""
Module: libfmp.c7.c7s2_audio_matching
Author: Meinard Mueller, Frank Zalkow
License: The MIT license, https://opensource.org/licenses/MIT

This file is part of the FMP Notebooks (https://www.audiolabs-erlangen.de/FMP)
"""
import numpy as np
from matplotlib import patches
from numba import jit
import scipy
import librosa

import libfmp.c3
import libfmp.c7


[docs]def quantize_matrix(C, quant_fct=None): """Quantize matrix values in a logarithmic manner (as done for CENS features) Notebook: C7/C7S2_CENS.ipynb Args: C (np.ndarray): Input matrix quant_fct (list): List specifying the quantization function (Default value = None) Returns: C_quant (np.ndarray): Output matrix """ C_quant = np.empty_like(C) if quant_fct is None: quant_fct = [(0.0, 0.05, 0), (0.05, 0.1, 1), (0.1, 0.2, 2), (0.2, 0.4, 3), (0.4, 1, 4)] for min_val, max_val, target_val in quant_fct: mask = np.logical_and(min_val <= C, C < max_val) C_quant[mask] = target_val return C_quant
[docs]def compute_cens_from_chromagram(C, Fs=1, ell=41, d=10, quant=True): """Compute CENS features from chromagram Notebook: C7/C7S2_CENS.ipynb Args: C (np.ndarray): Input chromagram Fs (scalar): Feature rate of chromagram (Default value = 1) ell (int): Smoothing length (Default value = 41) d (int): Downsampling factor (Default value = 10) quant (bool): Apply quantization (Default value = True) Returns: C_CENS (np.ndarray): CENS features Fs_CENS (scalar): Feature rate of CENS features """ C_norm = libfmp.c3.normalize_feature_sequence(C, norm='1') C_Q = quantize_matrix(C_norm) if quant else C_norm C_smooth, Fs_CENS = libfmp.c3.smooth_downsample_feature_sequence(C_Q, Fs, filt_len=ell, down_sampling=d, w_type='hann') C_CENS = libfmp.c3.normalize_feature_sequence(C_smooth, norm='2') return C_CENS, Fs_CENS
[docs]def scale_tempo_sequence(X, factor=1): """Scales a sequence (given as feature matrix) along time (second dimension) Notebook: C7/C7S2_DiagonalMatching.ipynb Args: X (np.ndarray): Feature sequences (given as K x N matrix) factor (float): Scaling factor (resulting in length "round(factor * N)"") (Default value = 1) Returns: X_new (np.ndarray): Scaled feature sequence N_new (int): Length of scaled feature sequence """ N = X.shape[1] t = np.linspace(0, 1, num=N, endpoint=True) N_new = np.round(factor * N).astype(int) t_new = np.linspace(0, 1, num=N_new, endpoint=True) X_new = scipy.interpolate.interp1d(t, X, axis=1)(t_new) return X_new, N_new
[docs]def cost_matrix_dot(X, Y): """Computes cost matrix via dot product Notebook: C7/C7S2_DiagonalMatching.ipynb Args: X (np.ndarray): First sequence (K x N matrix) Y (np.ndarray): Second sequence (K x M matrix) Returns: C (np.ndarray): Cost matrix """ return 1 - np.dot(X.T, Y)
[docs]def matching_function_diag(C, cyclic=False): """Computes diagonal matching function Notebook: C7/C7S2_DiagonalMatching.ipynb Args: C (np.ndarray): Cost matrix cyclic (bool): If "True" then matching is done cyclically (Default value = False) Returns: Delta (np.ndarray): Matching function """ N, M = C.shape assert N <= M, "N <= M is required" Delta = C[0, :] for n in range(1, N): Delta = Delta + np.roll(C[n, :], -n) Delta = Delta / N if cyclic is False: Delta[M-N+1:M] = np.inf return Delta
[docs]def mininma_from_matching_function(Delta, rho=2, tau=0.2, num=None): """Derives local minima positions of matching function in an iterative fashion Notebook: C7/C7S2_DiagonalMatching.ipynb Args: Delta (np.ndarray): Matching function rho (int): Parameter to exclude neighborhood of a matching position for subsequent matches (Default value = 2) tau (float): Threshold for maximum Delta value allowed for matches (Default value = 0.2) num (int): Maximum number of matches (Default value = None) Returns: pos (np.ndarray): Array of local minima """ Delta_tmp = Delta.copy() M = len(Delta) pos = [] num_pos = 0 rho = int(rho) if num is None: num = M while num_pos < num and np.sum(Delta_tmp < tau) > 0: m = np.argmin(Delta_tmp) pos.append(m) num_pos += 1 Delta_tmp[max(0, m - rho):min(m + rho, M)] = np.inf pos = np.array(pos).astype(int) return pos
[docs]def matches_diag(pos, Delta_N): """Derives matches from positions in the case of diagonal matching Notebook: C7/C7S2_DiagonalMatching.ipynb Args: pos (np.ndarray or list): Starting positions of matches Delta_N (int or np.ndarray or list): Length of match (a single number or a list of same length as Delta) Returns: matches (np.ndarray): Array containing matches (start, end) """ matches = np.zeros((len(pos), 2)).astype(int) for k in range(len(pos)): s = pos[k] matches[k, 0] = s if isinstance(Delta_N, int): matches[k, 1] = s + Delta_N - 1 else: matches[k, 1] = s + Delta_N[s] - 1 return matches
[docs]def plot_matches(ax, matches, Delta, Fs=1, alpha=0.2, color='r', s_marker='o', t_marker=''): """Plots matches into existing axis Notebook: C7/C7S2_DiagonalMatching.ipynb Args: ax: Axis matches: Array of matches (start, end) Delta: Matching function Fs: Feature rate (Default value = 1) alpha: Transparency pramaeter for match visualization (Default value = 0.2) color: Color used to indicated matches (Default value = 'r') s_marker: Marker used to indicate start of matches (Default value = 'o') t_marker: Marker used to indicate end of matches (Default value = '') """ y_min, y_max = ax.get_ylim() for (s, t) in matches: ax.plot(s/Fs, Delta[s], color=color, marker=s_marker, linestyle='None') ax.plot(t/Fs, Delta[t], color=color, marker=t_marker, linestyle='None') rect = patches.Rectangle(((s-0.5)/Fs, y_min), (t-s+1)/Fs, y_max, facecolor=color, alpha=alpha) ax.add_patch(rect)
[docs]def matching_function_diag_multiple(X, Y, tempo_rel_set=[1], cyclic=False): """Computes diagonal matching function using multiple query strategy Notebook: C7/C7S2_DiagonalMatching.ipynb Args: X (np.ndarray): First sequence (K x N matrix) Y (np.ndarray): Second sequence (K x M matrix) tempo_rel_set (np.ndarray): Set of relative tempo values (scaling) (Default value = [1]) cyclic (bool): If "True" then matching is done cyclically (Default value = False) Returns: Delta_min (np.ndarray): Matching function (obtained by from minimizing over several matching functions) Delta_N (np.ndarray): Query length of best match for each time position Delta_scale (np.ndarray): Set of matching functions (for each of the scaled versions of the query) """ M = Y.shape[1] num_tempo = len(tempo_rel_set) Delta_scale = np.zeros((num_tempo, M)) N_scale = np.zeros(num_tempo) for k in range(num_tempo): X_scale, N_scale[k] = scale_tempo_sequence(X, factor=tempo_rel_set[k]) C_scale = cost_matrix_dot(X_scale, Y) Delta_scale[k, :] = matching_function_diag(C_scale, cyclic=cyclic) Delta_min = np.min(Delta_scale, axis=0) Delta_argmin = np.argmin(Delta_scale, axis=0) Delta_N = N_scale[Delta_argmin] return Delta_min, Delta_N, Delta_scale
[docs]@jit(nopython=True) def compute_accumulated_cost_matrix_subsequence_dtw(C): """Given the cost matrix, compute the accumulated cost matrix for subsequence dynamic time warping with step sizes {(1, 0), (0, 1), (1, 1)} Notebook: C7/C7S2_SubsequenceDTW.ipynb Args: C (np.ndarray): Cost matrix Returns: D (np.ndarray): Accumulated cost matrix """ N, M = C.shape D = np.zeros((N, M)) D[:, 0] = np.cumsum(C[:, 0]) D[0, :] = C[0, :] for n in range(1, N): for m in range(1, M): D[n, m] = C[n, m] + min(D[n-1, m], D[n, m-1], D[n-1, m-1]) return D
[docs]@jit(nopython=True) def compute_optimal_warping_path_subsequence_dtw(D, m=-1): """Given an accumulated cost matrix, compute the warping path for subsequence dynamic time warping with step sizes {(1, 0), (0, 1), (1, 1)} Notebook: C7/C7S2_SubsequenceDTW.ipynb Args: D (np.ndarray): Accumulated cost matrix m (int): Index to start back tracking; if set to -1, optimal m is used (Default value = -1) Returns: P (np.ndarray): Optimal warping path (array of index pairs) """ N, M = D.shape n = N - 1 if m < 0: m = D[N - 1, :].argmin() P = [(n, m)] while n > 0: if m == 0: cell = (n - 1, 0) else: val = min(D[n-1, m-1], D[n-1, m], D[n, m-1]) if val == D[n-1, m-1]: cell = (n-1, m-1) elif val == D[n-1, m]: cell = (n-1, m) else: cell = (n, m-1) P.append(cell) n, m = cell P.reverse() P = np.array(P) return P
[docs]@jit(nopython=True) def compute_accumulated_cost_matrix_subsequence_dtw_21(C): """Given the cost matrix, compute the accumulated cost matrix for subsequence dynamic time warping with step sizes {(1, 1), (2, 1), (1, 2)} Notebook: C7/C7S2_SubsequenceDTW.ipynb Args: C (np.ndarray): Cost matrix Returns: D (np.ndarray): Accumulated cost matrix """ N, M = C.shape D = np.zeros((N + 1, M + 2)) D[0:1, :] = np.inf D[:, 0:2] = np.inf D[1, 2:] = C[0, :] for n in range(1, N): for m in range(0, M): if n == 0 and m == 0: continue D[n+1, m+2] = C[n, m] + min(D[n-1+1, m-1+2], D[n-2+1, m-1+2], D[n-1+1, m-2+2]) D = D[1:, 2:] return D
[docs]@jit(nopython=True) def compute_optimal_warping_path_subsequence_dtw_21(D, m=-1): """Given an accumulated cost matrix, compute the warping path for subsequence dynamic time warping with step sizes {(1, 1), (2, 1), (1, 2)} Notebook: C7/C7S2_SubsequenceDTW.ipynb Args: D (np.ndarray): Accumulated cost matrix m (int): Index to start back tracking; if set to -1, optimal m is used (Default value = -1) Returns: P (np.ndarray): Optimal warping path (array of index pairs) """ N, M = D.shape n = N - 1 if m < 0: m = D[N - 1, :].argmin() P = [(n, m)] while n > 0: if m == 0: cell = (n-1, 0) else: val = min(D[n-1, m-1], D[n-2, m-1], D[n-1, m-2]) if val == D[n-1, m-1]: cell = (n-1, m-1) elif val == D[n-2, m-1]: cell = (n-2, m-1) else: cell = (n-1, m-2) P.append(cell) n, m = cell P.reverse() P = np.array(P) return P
[docs]def compute_cens_from_file(fn_wav, Fs=22050, N=4410, H=2205, ell=21, d=5): """Compute CENS features from file Notebook: C7/C7S2_AudioMatching.ipynb Args: fn_wav (str): Filename of wav file Fs (scalar): Feature rate of wav file (Default value = 22050) N (int): Window size for STFT (Default value = 4410) H (int): Hop size for STFT (Default value = 2205) ell (int): Smoothing length (Default value = 21) d (int): Downsampling factor (Default value = 5) Returns: X_CENS (np.ndarray): CENS features L (int): Length of CENS feature sequence Fs_CENS (scalar): Feature rate of CENS features x_duration (float): Duration (seconds) of wav file """ x, Fs = librosa.load(fn_wav, sr=Fs) x_duration = x.shape[0] / Fs X_chroma = librosa.feature.chroma_stft(y=x, sr=Fs, tuning=0, norm=None, hop_length=H, n_fft=N) X_CENS, Fs_CENS = libfmp.c7.compute_cens_from_chromagram(X_chroma, Fs=Fs/H, ell=ell, d=d) L = X_CENS.shape[1] return X_CENS, L, Fs_CENS, x_duration
[docs]def compute_matching_function_dtw(X, Y, stepsize=2): """Compute CENS features from file Notebook: C7/C7S2_AudioMatching.ipynb Args: X (np.ndarray): Query feature sequence (given as K x N matrix) Y (np.ndarray): Database feature sequence (given as K x M matrix) stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2) Returns: Delta (np.ndarray): DTW-based matching function C (np.ndarray): Cost matrix D (np.ndarray): Accumulated cost matrix """ C = libfmp.c7.cost_matrix_dot(X, Y) if stepsize == 1: D = libfmp.c7.compute_accumulated_cost_matrix_subsequence_dtw(C) if stepsize == 2: D = libfmp.c7.compute_accumulated_cost_matrix_subsequence_dtw_21(C) N, M = C.shape Delta = D[-1, :] / N return Delta, C, D
[docs]def matches_dtw(pos, D, stepsize=2): """Derives matches from positions for DTW-based strategy Notebook: C7/C7S2_AudioMatching.ipynb Args: pos (np.ndarray): End positions of matches D (np.ndarray): Accumulated cost matrix stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2) Returns: matches (np.ndarray): Array containing matches (start, end) """ matches = np.zeros((len(pos), 2)).astype(int) for k in range(len(pos)): t = pos[k] matches[k, 1] = t if stepsize == 1: P = libfmp.c7.compute_optimal_warping_path_subsequence_dtw(D, m=t) if stepsize == 2: P = libfmp.c7.compute_optimal_warping_path_subsequence_dtw_21(D, m=t) s = P[0, 1] matches[k, 0] = s return matches
[docs]def compute_matching_function_dtw_ti(X, Y, cyc=np.arange(12), stepsize=2): """Compute transposition-invariant matching function Notebook: C7/C7S2_AudioMatching.ipynb Args: X (np.ndarray): Query feature sequence (given as K x N matrix) Y (np.ndarray): Database feature sequence (given as K x M matrix) cyc (np.nda(rray): Set of cyclic shift indices to be considered (Default value = np.arange(12)) stepsize (int): Parameter for step size condition (1 or 2) (Default value = 2) Returns: Delta_TI (np.ndarray): Transposition-invariant matching function Delta_ind (np.ndarray): Cost-minimizing indices Delta_cyc (np.ndarray): Array containing all matching functions """ M = Y.shape[1] num_cyc = len(cyc) Delta_cyc = np.zeros((num_cyc, M)) for k in range(num_cyc): X_cyc = np.roll(X, k, axis=0) Delta_cyc[k, :], C, D = compute_matching_function_dtw(X_cyc, Y, stepsize=stepsize) Delta_TI = np.min(Delta_cyc, axis=0) Delta_ind = np.argmin(Delta_cyc, axis=0) return Delta_TI, Delta_ind, Delta_cyc