Encontrar rangos con Numpy
Cuando trabajamos con datos, en muchas ocasiones tenemos que localizar rangos de valores que cumplan ciertas condiciones, normalmente sobrepasar un cierto umbral.
Si nos ponemos a teclear sin pensar usaremos un bucle. Esta opción puede ser la óptima en muchos lenguajes pero, si usamos python, probablmente sea más eficiente usar numpy.
Buscando rangos con numpy
Para encontrar los rangos por los que nos preguntábamos inicialmente primero tenemos que filtrar:
import numpy as np data = np.cos(np.linspace(0, 10*np.pi, 1000)) threshold = 0 idxs = data > threshold
Con la línea data > threshold
obtenemos un vector lógico, el cual nos
indica si cada valor sobrepasa (True
) o no (False
) el umbral. Para
entender cómo funciona esta operación hay que tener claro el concepto de
broadcasting
.
Esta técnica se usa mucho en numpy y otros paquetes derivados, por lo
que es interesante conocerla. Aunque en este ejemplo es evidente qué es
lo ocurre tras bambalinas, en otros casos su aplicación es de todo menos
trivial.
Una vez tenemos qué índices están por encima y cuáles por debajo (o son iguales) de nuestro umbral, vamos a intentar conocer en qué puntos pasamos de un lado a otro, esto es, qué puntos nos delimitan los distintos rangos en que podemos nuestra serie de datos original.
ls = np.nonzero(idxs[:-1] != idxs[1:])
También podemos usar np.where(...)
.
In [24]: %timeit np.nonzero(idxs[:-1] != idxs[1:]) 5.98 µs ± 19.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) In [25]: %timeit np.where(idxs[:-1] != idxs[1:]) 5.45 µs ± 84.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
Ahora ya tenemos los índices del vector con los datos que estamos estudiando en los que se producen los cruces del valor límite.
In [22]: ls = np.nonzero(idxs[:-1] != idxs[1:]) Out[22]: (array([ 49, 149, 249, 349, 449, 549, 649, 749, 849, 949]),) In [23]: data[149] Out[23]: -0.026727084775504988 In [24]: data[150] Out[24]: 0.004717088593031313 In [26]: data[49] Out[26]: 0.02987056142625339 In [27]: data[50] Out[27]: -0.001572368047584414
Es importante darse cuenta que, obviamente, y como podemos ver en las salidas del ejemplo anterior, detectamos tanto los índices en que pasamos el umbral tanto de abajo a arriba como de arriba hacia abajo. También es importante entender cómo hemos logrado detectar dichos índices. Veamos como funciona el código hasta ahora con un ejemplo manejable.
data = [2, 3, 2, 4, 5, 4, 3, 5, 2, 3] operación --------- idxs = data > 3 output: ------- [ 2 > 3 [ False 3 > 3 False 2 > 3 False 4 > 3 True 5 > 3 => True 4 > 3 True 3 > 3 False 5 > 3 True 2 > 3 False 3 > 3 ] False ] operación --------- ENCUENTRA ÍNDICES DONDE idxs[todos los valores salvo el último] SON DISTINTOS DE idxs[todos los valores salvo el primero] output: ------- [ ----- 0. [ False False 1. False False 2. False True 3. True True Where 4. True != True ======> (2, 5, 6, 7) 5. True False 6. False True 7. True False 8. False False ] ----- ]
Ahora que tenemos los índices, para obtener los rangos como parejas de
valores podemos o iterar sobre la colección ls=o usar la función =zip
.
In [39]: ls = ls[0] In [41]: zip(ls[:-1], ls[1:]) Out[41]: <zip at 0x129d2c9c8> In [42]: list(zip(ls[:-1], ls[1:])) Out[42]: [(49, 149), (149, 249), (249, 349), (349, 449), (449, 549), (549, 649), (649, 749), (749, 849), (849, 949)]
Hay que ser consciente que los rangos están definidos de forma que el primer valor es el índice previo al inicio del rango que cumple nuestra condición, mientras que el segundo es el último valor dentro del rango. En función de esto, tendremos que incrementar o el primer o el segundo valor del par, según si queremos que el rango defina solamente los valores que cumplen la condición, o bien los valores externos al rango de valores que cumplen dicha condición. Personalmente siempre trabajo con el primer caso.
Ahora ya podemos iterar sobre los pares anteriores y modificar la función original como necesitemos.
rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if data[p[0]] > 0] cp = data.copy() for pair in rs: np.put(cp, np.arange(*pair), 0)
En la primera línea filtramos aquellos pares que nos interesan, en
nuestro caso los que delimitan un rango de valores superiores a un
cierto umbral. Puede verse que en el segundo argumento de zip
sumamos
uno. Esto se debe a que la función np.arange
es exclusiva por la
derecha, por lo que he de incrementar el rango para que modifique el
último valor del mismo.
Después, copiamos los valores a un vector auxiliar para poder usar la
función put
sin destruir nuestros valores de origen. Esta función toma
el array a modificar, con los índices que delimitan el rango de valores
a sustituir, y otro array de la misma longitud que el anterior con los
valores que van a sustituir a los originales. Si pasamos un escalar,
otra vez Numpy usará el broadcasting
para rellenar todos los índices
con el mismo valor.
En vez de un escalar, podemos pasar una función la cual, a partir de los datos originales y los índices a modificar, nos devuelva un valor dependiente de los mismos.
Puesto todo junto nos queda un código bastante compacto:
def range_values_substitution(data, threshold, f=None, limit=0): # Podría no hacerse explícito, mejor así por claridad y # por evitar casos extremos. if f is None: f = threshold idxs = data > threshold ls = np.nonzero(idxs[:-1] != idxs[1:])[0] rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if data[p[0]] > threshold and p[1] - p[0] > limit] cp = data.copy() for pair in rs: np.put(cp, np.arange(*pair), f) return cp
Hemos modificado el filtrado que nos extrae los rangos que deseamos de
forma que, si el rango no tiene un ancho mínimo (p[1] - p[0] > limit
),
se descarte1.
La función anterior tiene un problema: descarta el primer y el último rango. Para evitarlo, podemos añadir un primer valor de signo contrario al primero valor real en la primera posición del array, y un último también de signo contrario al último valor, y procesar este nuevo vector.
def range_values_substitution(data, threshold, f=None, limit=0): if f is None: f = threshold aux = np.insert(data, [0, len(data)], [-data[0], -data[-1]]) idxs = aux > threshold ls = np.nonzero(idxs[:-1] != idxs[1:])[0] rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if aux[p[0]] > threshold and p[1] - p[0] > limit] cp = aux.copy() for pair in rs: np.put(cp, np.arange(*pair), f) return cp[1:-1]
Añadir valores a arrays de Numpy no es una operación muy eficiente. De todas formas, y al ser una operación puntual, salvo que vayamos muy justos de memoria, no nos perjudica en exceso, aunque hay que ser conscientes de que se empeora de forma sustancial el rendimiento. Podemos verlo en el siguiente ejemplo, donde aparecen los tiempos de ejecución si procesamos un vector con un millón de elementos:
# Versión original sin valores añadidos 11.2 ms ± 476 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) # Versión con valores añadidos al inicio y al final 16.5 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
Aunque estamos de enhorabuena, porque nos podemos evitar tener que
modificarlo: nuestra función nos da todos los rangos por encima y por
debajo de un cierto umbral. Por lo tanto, nos segmenta todo el vector en
secciones superiores e inferiores a dicho valor. Sabiendo esto, podemos
simplemente añadir al vector que define los límites de los rangos los
índices extremos para que se nos puedan crear los rangos iniciales y
finales. Con esto, y haciendo la suma fuera del zip
, llegamos a una
versión ligeramente distinta de la última que presentamos:
def range_values_substitution(data, threshold, f=None, limit=0): if f is None: f = threshold idxs = data > threshold ls = list(np.nonzero(idxs[:-1] != idxs[1:])[0] + 1) ls = np.array([0] + ls + [data.size-1]) rs = [p for p in zip(ls[:-1], ls[1:]) if data[p[0]] > threshold and p[1] - p[0] > limit] cp = data.copy() for pair in rs: np.put(cp, np.arange(*pair), f) return cp[1:-1]
con un tiempo de ejecución bastante mejorado:
12.5 ms ± 294 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Puede parecer extraño hacer:
ls = list(np.nonzero(idxs[:-1] != idxs[1:])[0] + 1) ls = np.array([0] + ls + [data.size-1])
Pero justo por lo dicho anteriormente, es mejor pasar el array a una
lista, concatenarla (O(1)), y volver a crear un np.ndarray
. Seguimos
creando uno, pero con insert
se creaban dos arrays temporales, lo que
explica la diferencia en el tiempo de ejecución2. Las pruebas de
rendimiento así lo demuestran.
Otras opciones
Si como variante queremos obtener un único rango entre el máximo y el mínimo valor, la función np.ptp
es justo lo que buscamos. Por otra parte np.clip
nos pone los valores por encima y debajo de un mínimo y un máximo justo en dichos valores, suficiente para el ejemplo que hemos presentado. Pero para sustituir por valores calculados de forma más compleja, o con el condicionante de rangos de un determinado tamaño, se nos queda corta.
Justo lo que hemos presentado se hace, de una forma en mi opinión un poco críptica, en este hilo de StackOverflow. Haciendo un par de pequeños ajustes a lo que allí tenemos nos queda:
def range_values_substitution_so(data, threshold, f=None, limit=0, st_inc=False): if f is None: f = threshold def islandinfo(y, trigger_val, stopind_inclusive=True): y_ext = np.r_[False, y>trigger_val, False] idx = np.flatnonzero(y_ext[:-1] != y_ext[1:]) lens = idx[1::2] - idx[:-1:2] return zip(idx[:-1:2], idx[1::2]-int(stopind_inclusive)), lens cp = data.copy() rs = [p for p in islandinfo(data, threshold, st_inc)[0] if p[1] - p[0] > limit] for pair in rs: np.put(cp, np.arange(*pair), f) return cp
Esta forma sí detecta sin necesidad de modificar el array los rangos ubicados en los extremos, aunque no mejora el rendimiento de nuestra última versión.
12.9 ms ± 689 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Por último ya, siempre que no necesitemos mantener los datos originales, si el put
lo hacemos sobre data
nos ahorraremos la copia, con lo que conseguiremos reducir aún más el tiempo de ejecución.
Conclusión
La forma que hemos presentado, aunque ha necesitado algunos ajustes, es bastante robusta3, permite obtener y modificar los rangos que deseemos en arrays de millones de elementos de forma rápida y, sobretodo, es bastante clara y legible.
Notas
Aunque pueda parecer una funcionalidad inútil, en ocasiones en el trabajo la hemos gastado para poder descartar solo rangos más anchos que un cierto rango de tiempo. Superar dicho rango de tiempo nos indicaba que los valores en ese se estaban generados por razones distintas a aquellas que queríamos estudiar. Al descartar estos rangos y sustituir los valores originales por otros no anómalos, en nuestro caso conseguíamos acelerar el procesamiento posterior.
No soy tampoco capaz de asegurar que esta sea la razón al 100%.
Al menos en los casos en los que se ha usado hasta ahora.