GCP y correlación de miles de datos financieros en tiempo real

Correlaciona miles de transmisiones de series temporales financieras en tiempo real

En este artículo, se describe el uso de Dataflow, la tecnología de procesamiento de transmisiones distribuida de Google, para compilar un sistema de Analítica en tiempo real que puede escalar desde algunas transmisiones de datos simultáneas de instrumentos financieros sin cambios, administración o infraestructura de trabajo.


La solución brinda datos exactos, completos y de coherencia sólida sin necesidad de usar el procesamiento por lotes o microlotes. A pesar de que esta implementación usa análisis de correlación, puedes generalizar los patrones de procesamiento que demuestra para otros usos, como el envío de señales de los dispositivos de IoT desde equipos en una fábrica.


La seguridad siempre es importante cuando trabajas con datos financieros. Google Cloud te ayuda a mantener tus datos protegidos, seguros y privados de diferentes maneras. Por ejemplo, todos los datos se encriptan durante la transmisión y cuando están en reposo, y Google Cloud cumple con ISO 27001, SOC3, FINRA y PCI.


Nota: Esta información es solamente de ejemplo.  No la utilices para tomar decisiones de inversión.

Hoy en día, la mayor parte del procesamiento de datos es procesamiento por lotes, que se lleva a cabo en una instantánea de datos congelados en el tiempo. Las instantáneas se procesan todas de una vez y los resultados se generan en ciclos, lo que funciona bien en muchos casos prácticos. Sin embargo, otros casos prácticos exigen resultados exactos, pero con baja latencia desde fuentes basadas en transmisión, en las que los datos y sus horas de evento correspondientes se transforman en una tupla para usar en el procesamiento. Las herramientas como Dataflow facilitan la producción de análisis rápidos y precisos para que puedas concentrarte en la lógica empresarial, en lugar de implementar las primitivas del procesamiento de transmisión.

En esta solución, se ilustra un caso práctico que aborda los desafíos que enfrenta un banco líder. El caso práctico reúne miles de transmisiones en vivo de datos financieros para aplicar los procesamientos de correlación de Pearson. Las correlaciones te permiten explorar la relación entre dos series temporales. Un ejemplo común es la relación entre las ventas de paraguas y las ventas de helado. Trazados en una gráfica uno contra el otro, suelen moverse en direcciones opuestas. Los cálculos de correlación en masa permiten a los comerciantes identificar correlaciones inusuales o inversas, con las que pueden comerciar.

Puedes encontrar el código para la solución en GitHub en https://github.com/GoogleCloudPlatform/data-timeseries-java.

Agrupación en lotes


La definición de este término depende de los detalles de la implementación. Algunos sistemas exponen una agrupación por lotes subyacente y la definen como sistema de ventanas, en cuyo caso significa sistema de ventanas del tiempo de procesamiento, una variante menos útil en términos de semántica que el sistema de ventanas de horas del evento que se analiza en este artículo.


En un sistema con suficiente flexibilidad, puedes combinar la potencia semántica del sistema de ventanas de horas del evento con la baja latencia de un motor de transmisión en vivo, ya que esta solución se demuestra con Dataflow.


Para obtener una explicación más detallada, consulta los excelentes blogs de Tyler Akidau: The World Beyond Batch Streaming 101 (“El mundo más allá de los lotes: introducción a la transmisión”) yThe World Beyond Batch Streaming 102 (“El mundo más allá de los lotes: más información acerca de la transmisión”).


Sistema de ventanas


Este término se refiere al agregado de elementos en el contexto de su tiempo de evento, es decir, cuando sucedieron en realidad. La transmisión en vivo y los sistemas de microlotes pueden realizar un sistema de ventanas, a pesar de que los microlotes introducen latencia adicional.


Ventana operativa


Este término se refiere a un solo panel dentro de una ventana deslizante, como se ilustra en el siguiente diagrama.

Figura 1. El término operating window (ventana operativa) se refiere a un solo panel dentro de una ventana deslizante.


En la Figura 1, el eje x contiene referencias a tres series temporales, TS-1, TS-2 y TS-3. El eje y muestra fragmentos de tiempo discretos, definidos por t0 hasta t5. Las ventanas operativas 01 y 02 cubren diferentes fragmentos de tiempo en un modo de ventana deslizante. Los datos t2 a t3 se comparten entre las ventanas operativas 01 y 02.


Vela


Este término deriva del Gráfico de velas que se usa, por lo general, para representar series temporales financieras. Cada vela contiene información para las agregaciones siguientes dentro de un fragmento de tiempo dado:


  • El valor de apertura dentro de un fragmento de tiempo o el valor de cierre del fragmento de tiempo anterior.
  • El valor de cierre dentro de un fragmento de tiempo.
  • El valor mínimo dentro de un fragmento de tiempo.
  • El valor máximo dentro de un fragmento de tiempo.

Esta solución no establece una longitud fija del fragmento de tiempo que se usa para cada vela. Puedes establecer la longitud en días o segundos, de acuerdo con los cálculos que necesites más adelante.

Fases clave de la solución

Esta solución construye una canalización en dos fases clave, cada una con sus propias etapas de subprocesamiento.

En el siguiente diagrama, se ilustran las dos fases de la canalización.

Figura 2. En la Fase 1, debes crear rectángulos de datos perfectos. En la Fase 2, debes procesar las correlaciones.


Fase 1: crea rectángulos de datos perfectos


Sería fácil si solo quisieras crear velas mediante los datos en la transmisión. Las funciones analíticas de Dataflow, que puedes configurar con un par de líneas de código, pueden ocuparse de las agregaciones transitivas y asociativas de encontrar los valores open, close, min y max para cada vela en un fragmento de tiempo.


La dificultad es que las series temporales tienen diferentes tasas de liquidez, o marcas, en cada fragmento de tiempo, como se ilustra en el siguiente diagrama.

Figura 3. Los datos que no se actualizaron durante el fragmento de tiempo se muestran como datos faltantes.


Como se observa en la Figura 3, algunas transmisiones tienen datos en cada fragmento de tiempo, y otras tienen datos faltantes. Cuando faltan datos de un fragmento de tiempo, no puedes compilar correlaciones de manera confiable. La ausencia de un valor no significa que el elemento no tenga precio, solo que el precio no se actualizó durante este período.


Cuando comienzas a trabajar en la solución, primero debes crear rectángulos perfectos de datos, es decir, rectángulos con todos los datos faltantes incluidos, como se ilustra en el siguiente diagrama.

Figura 4. Un rectángulo perfecto de datos no contiene datos faltantes.


Como se muestra en la Figura 4, se calcula un valor para cada dato en el fragmento de tiempo.


Los rectángulos perfectos sirven para muchos tipos de procesamientos, por eso, a pesar de que la muestra en esta solución requiere ambas fases dentro de un solo flujo de datos, puedes usar cada fase en canalizaciones diferentes.


Fase 2: procesa las correlaciones


Si usas los rectángulos de datos completos de la Fase 1, puedes proveer los datos a una biblioteca de correlación de Pearson para producir los resultados que deseas. El desafío en esta fase no son los procesamientos de correlación, sino los procesos fan-in y fan-out, que fuerzan la redistribución de una gran cantidad de datos.


Dadas miles de series temporales, la cantidad de pares únicos está en el orden de ((n^2-n)/2), en la que n es la cantidad de series temporales que se procesan.


En la figura que se encuentra a continuación, se muestran los números de pares creados cuando se comparan cuatro valores de monedas.

Figura 5. Números de pares creados cuando se comparan cuatro valores de monedas.


GBP:AUD indica la serie temporal que proporciona el precio de la libra esterlina del Reino Unido en comparación con el dólar australiano con el paso del tiempo (la línea central se ignora porque la correlación de la moneda contra sí misma no es de valor; y los bloques verdes se ignoran porque no hay necesidad de procesar la correlación del mismo par de monedas dos veces).


Seleccionar los valores X genera los seis pares de monedas que se muestran a continuación:

[{GBP:EUR VS GBP:BRL},
{GBP:EUR VS EUR:AUD},
{GBP:EUR VS GBP:JPY},
{GBP:BRL VS EUR:AUD},
{GBP:BRL VS GBP:JPY},
{EUR:AUD VS GBP:JPY}]

Debido a que Cloud Dataflow es un sistema de procesamiento distribuido, ahora tienes un problema de redistribución, es decir, un fan-in tiene que ocurrir antes del fan-out. Los datos por sistema operativo se distribuyen a través de varias máquinas.


A esta altura, puede parecer una buena idea usar Pub/Sub para conectar los dos flujos de datos. Mientras que, desde un punto de vista técnico y funcional, esto es posible, las llamadas de RPC introducirían costos y latencia a los procesamientos. En esta solución, se usan algunos patrones avanzados de Dataflow para realizar toda esta actividad con una latencia muy baja.


Para obtener más información sobre el tamaño del problema de redistribución mencionado con anterioridad, consulta Apéndice A: Calcula bytes redistribuidos.


Revisa cómo funciona la solución


En esta sección, se profundiza sobre los detalles de cada fase y la ejecución del código de solución.


Crear rectángulos de datos perfectos


Crear rectángulos de datos perfectos es un proceso de dos pasos, que son los siguientes:


  • Transportar y fusionar la transmisión de datos.
  • Crear las velas.

Transportar y fusionar la transmisión de datos


Muchas fuentes de datos, tanto internas como de proveedores de terceros, se deben canalizar por un solo sistema hacia las etapas de procesamiento de datos.


En la tabla a continuación, se describe la estructura de datos simplificada que se usó en este artículo. Los datos reales de movimientos de precios contendrán más propiedades.

Tipo Propiedad
Largo Marca de tiempo
String Clave
Doble Precio

Nota: La marca de tiempo almacenada en el dato es la hora del evento. Usa este valor cuando defines en qué ventanas se procesan los datos. Cuando incorpores el mensaje en Pub/Sub, usa la etiqueta de la marca de tiempo.

Las transmisiones de datos se consumen a través de Pub/Sub, que puede procesar millones de mensajes por segundo en un solo tema y transferir los datos a Dataflow. Pub/Sub retiene los datos durante un máximo de 7 días y solo quita los datos de una suscripción activa después de que el suscriptor confirma la recepción del mensaje. Pub/Sub actúa como amortiguador del sistema, se ocupa de cualquier aumento masivo inesperado en el mercado.


Mediante los controles de administración de identidades y accesos, puedes crear primero un tema en un proyecto de la propiedad de un proveedor de datos y, luego, otorgar la habilidad de crear suscripciones con ese tema a personas, grupos o cuentas de servicio.


Crear las velas


Primero creas las velas, que es un proceso de tres pasos, que se indican debajo:


  • Crea los objetos de agregación iniciales, que ignoran los valores faltantes.
  • Genera un marcador de posición de movimiento de precio para cualquier transmisión a la que le falten valores en el período.
  • Propaga los marcadores de posición de movimiento de precio con los valores del período anterior.

Crear objetos de agregación iniciales


A continuación, se muestran los pasos de alto nivel en esta transformación:


  • Lee de todos los temas.
  • Crea ventanas basadas en el tamaño de la vela a la hora del evento.
  • Agrupa por clave, por ejemplo, GBP:EUR, GBP:BRL, EUR:AUD, GBP:JPY.
  • Agregado para Close y Min/Max.

Los tres primeros pasos requieren relativamente pocas líneas de código porque Dataflow se encarga de todo el trabajo de mantenimiento. El código usa las siguientes primitivas de flujos de datos:


  • PubSubIO, para el origen de la lectura
  • Window.into, para definir la duración de las velas
  • GroupByKey, para separar cada una de las claves diferentes

Existen algunos métodos que puedes usar en el paso 4 de la transformación. Cuando se realizan agregaciones, se recomiendan las funciones listas para usar, a menos que necesites aplicar una lógica de negocios personalizada durante la agregación, como en este caso. La agregación incluye la obtención de Min y Max, así como del elemento con la marca de tiempo más alta para determinar el valor Close.


Mediante las marcas de agua de Dataflow, puedes estar seguro de que tienes las velas correctas con los valores Close, Min y Max para cada fragmento de tiempo, según los datos que se enviaron a Pub/Sub. En el siguiente paso, verás cómo calcular el valor Open y tratar las series temporales que no se marcaron en este fragmento de tiempo.


Detecta valores faltantes y genera datos de marcador de posición


En este paso, llenarás los espacios mostrados en la Figura 3 mediante la división de tu código de Dataflow en dos ramas. La rama 1 contiene las velas de los valores en vivo. La rama 2 combina todos los valores en vivo para encontrar la lista única de claves en este período. Comparas la rama 2 con una lista de todos los valores esperados a fin de obtener un conjunto que contenga todos los datos faltantes para este período. Con esta lista, incorporas velas ficticias a la transmisión.

Nota: El sistema de marcas de agua de Dataflow garantiza que todos los datos de todas las ramas se procesen antes de que se emita la siguiente ventana, por lo que no tienes que preocuparte por sincronizar tu sistema para diferentes ventanas.

Propaga los datos de marcador de posición con datos


Ahora tienes un rectángulo perfecto de datos que contiene ambos valores, en vivo y ficticios. Las velas en vivo no tienen el valor Open propagado, mientras que los valores ficticios no contienen otros valores más que la marca de tiempo. En este paso, el valor Close del fragmento de tiempo anterior propaga el valor Open de las velas en vivo y ficticias actuales. Debido a que las velas ficticias ahora tienen un valor Open, la siguiente acción es propagar el valor Open a Close y Min/Max de las velas ficticias.

Figura 6. Ahora todos están completos, menos TS-4.


Como se muestra en la Figura 6, esto completa nuestros rectángulos perfectos, con la excepción de TS-4, el cual no tenía un valor cuando comenzó el flujo de datos en el fragmento de tiempo de t0 a t1. Este es un problema de arranque: según donde estén almacenados los datos, la solución tiene que llamar a los sistemas de almacenamientos externos, por ejemplo, Cloud Bigtable, para obtener el último valor antes de que el flujo de datos comience.


Nota: El código de muestra no realiza esta acción de arranque para que pueda funcionar como un ejemplo independiente.


Ahora tienes rectángulos de datos perfectos que procesan los movimientos a la Fase 2.


Crear estos rectángulos de datos no solo es útil para los análisis de correlación. Puedes almacenar los valores mediante la creación de ramas con Dataflow y enviar los valores calculados al almacenamiento, por ejemplo, BigQuery o Bigtable, siempre que necesites un conjunto completo de valores basados en rectángulos perfectos.


Procesa correlaciones


Para procesar tus correlaciones, debes seguir estos tres pasos:


  • Crea las ventanas operativas.
  • Procesa las correlaciones.
  • Publica y almacena los resultados.

Crear las ventanas operativas


En la Fase 1 se ubicaron los datos en muchos hosts. Para compilar los arreglos de datos desde las velas, primero debes juntar todas las velas desde todas las claves de cada ventana operativa en una sola ubicación. Luego, tienes que copiar los paquetes de datos de cada ventana operativa en varias ubicaciones antes del fan-out de datos a gran escala.


Primero, crea una ventana móvil de los valores generados en la Fase 1 mediante las primitivas del sistema de ventanas de Dataflow. Esto lleva solo una línea de código:

 Window.into(SlidingWindows.of())
Los valores de esta ventana operativa se distribuyen en varios hosts, pero para tus correlaciones debes realizar un fan-in en todos estos datos en una sola ubicación.

El volumen de datos y la sobrecarga de administración que se generaron durante el fan-out es órdenes de magnitud más grande que los datos que contiene la ventana operativa en sí misma. Debes distribuir varias copias de cada ventana operativa antes del fan-out, lo que permite que varios hosts realicen el procesamiento. Para esto, usa SideInputscon ventanas de Dataflow a fin de enviar varias copias de los datos a las máquinas que procesan un subconjunto de los pares de claves.

Ahora tienes una copia completa de todos los datos que necesitas para el fan-out en varios hosts, como se muestra en la Figura 7.

Figura 7. Procesamiento de fan-out a varios hosts.


A continuación, escribe una transformación que genera la tupla de datos para todas las correlaciones.


Procesar las correlaciones


En esta etapa, el trabajo duro está hecho. Los paquetes de trabajo contienen todos los datos necesarios a fin de calcular las correlaciones para esta ventana operativa. Cada paquete de trabajo es un rectángulo perfecto. Ambos lados del procesamiento de la correlación contienen un número de valores iguales, que pueden pasarse a la biblioteca de algoritmo de correlación para que realice el procesamiento.


Luego, tienes que decidir qué procesamientos son de interés para tu negocio. Si deseas usar estos datos para la representación visual, recomendamos que reduzcas la cantidad de datos enviados a fin de que solo contengan correlaciones que estén por encima del valor ABS(correlation) que especifiques. O puedes enviar todos los valores a los sistemas de publicación y almacenamiento. Esta es una función simple de costos para adaptar los procesamientos (n^n-n)/2 que se pueden generar.

Nota: En este ejemplo, las correlaciones son muy livianas, por lo que no necesitas redistribuir el trabajo. Una sola máquina puede trabajar en segundos a través de ventanas incluso más grandes. Por lo tanto, aunque el agregado es una etapa separada, en este ejemplo, permitirás que la fusión de Dataflow haga su trabajo y contraiga la creación de tuplas y el código de procesamiento en uno.

Publica y almacena los resultados


Los requisitos para esta etapa dependen del uso que harás de los datos más adelante. Puedes enviar los resultados a tres ubicaciones, todas opcionales y no exclusivas entre sí:


  • Pub/Sub. Además de realizar la transferencia, Pub/Sub también puede actuar como la unión entre los sistemas vinculados de manera flexible. Puedes enviar los datos procesados a otros sistemas para consumirlos; por ejemplo, puedes enviar todas las correlaciones con más del valor de ABS(0.2) a otros sistemas.
  • BigQuery. Ubica cualquier dato que quieras procesar o al que quieras acceder más tarde con una interfaz de SQL a BigQuery.   
  • Bigtable. Coloca los datos que quieras usar para el almacenamiento de baja latencia o en el que quieras obtener un subconjunto muy pequeño de un conjunto de datos más grande (búsquedas de claves y análisis de rango) en Bigtable.

Próximos pasos


Obtén el código para esta solución en GitHub.

The World Beyond Batch Streaming 101 (“El mundo más allá de los lotes: introducción a la transmisión”).

The World Beyond Batch Streaming 102 (“El mundo más allá de los lotes: más información acerca de la transmisión”).

Prueba otras características de Google Cloud. Revisa los  instructivos oficiales.


Apéndice: calcular bytes redistribuidos


En este apéndice, se usan valores de cinco series temporales para que el ejemplo sea simple:


Supongamos que la ventana operativa es de 10 minutos y quieres usar velas que duran 1 minuto. En este caso, por cada correlación necesitas 10 datos por serie temporal. Para simplificarlo, supongamos que cada dato es de 1 byte, lo que da 10 * 1 = 10 bytes por clave. Hay 5 series temporales, lo que da 5 * 10 = 50 bytes por ventana operativa.


Situación 1: distribución inmediata después de la convergencia¿Cuántos valores se crean durante la distribución? Como solo hay 5, se verá de la siguiente manera:


{1-2, 1-3, 1-4, 1-5, 2-3, 2-4, 2-5, 3-4, 3-5, 4-5} = 10 pares O ((n^2-n)/2) en el que n es la cantidad de series temporales, (5^2 - 5)/2 = 10


Cada par tiene 10 * 2 = 20 bytes de datos y hay 10 pares, entonces tienes 200 bytes de datos para redistribuir en el sistema.


No está mal, pero si haces el mismo cálculo por 1000 series temporales, tienes (1000^1000-1000)/2 = 499500, que da 499500 * 10 * 2 = 9,990,000 bytes para redistribuir.


Si miramos los datos en una ventana operativa sin la distribución en forma de pares, solo tienes 1,000 * 10 = 10000 bytes para mover. Es más eficiente copiar la ventana operativa a varias particiones y, luego, crear correlaciones en esas particiones.