Logs en streaming

Documentación y una guía rápida de cómo echar a andar tu primer consumer

Si tienes cualquier duda sobre el servicio de Streaming de logs en tiempo real, contacta con nosotros en support@transparentcdn.com.

Para activar el servicio de Streaming de logs en tiempo real, tan sólo debes acceder al panel en https://dashboard.transparentcdn.com/ y dentro del apartado Logs, en la pestaña Streaming, encontrarás todo lo necesario.

Una vez activado el servicio, podrás descargarte un archivo zip que contiene los certificados digitales necesarios para autenticar a tus consumers así como una serie de plantillas preconfiguradas con tus datos para consumir los logs mediante filebeat, logstash y python.

Además, puedes agregar fácilmente las direcciones IP en las cuales instalarás el/los consumer, para que automáticamente se ajusten las reglas del firewall necesarias en nuestros brokers.

Las plantillas, vienen preconfiguradas con todos los datos necesarios, pero a continuación, vamos a enumerar el contenido del zip y ciertos requisitos y parámetros importantes a considerar, uno muy importante es el Consumer Group.

Parámetros de conexión:

  • La dirección de nuestros brokers:

    • kafka1.edgetcdn.com

    • kafka2.edgetcdn.com

    • kafka3.edgetcdn.com

  • El puerto a utilizar, que será el 9093

Contenido del zip:

  • Certificado público de cliente c<ID>.crt.pem

  • Certificado privado c<ID>.key.pem

  • Keystore en formato PKCS12 c<ID>.keystore.p12

  • La contraseña utilizada para cifrar el keystore y el truststore: password.txt

  • El certificado público de nuestra CA transparentcdnCA.pem

  • Truststore con nuestra CA (necesario en algunos consumers): truststore.p12

  • Plantilla para Filebeat: filebeat.yml

  • Plantilla para Logstash: kafka-logstash.conf

  • Consumer sencillo en Python: consumer.py

Otros datos:

  • El Topic al que suscribirse que será c<ID>

  • El prefijo de Consumer Group al que unir tus consumers. Por ejemplo, si tu <ID> (identificador de cliente) es el 83, te suscribirás al topic c83, y podrás unir tus consumers a cualquier "Consumer Group" que comience por c83_ como por ejemplo c83_group1, c83_test, c83_pre ... Puedes consultar más información sobre los consumer groups aquí.

Nosotros necesitaremos:

  • La(s) dirección(es) IP(s) desde donde conectarán tus consumers. Que las puedes añadir desde el propio panel (al añadirlas, se necesita un margen de 5 minutos hasta que estén activas en nuestro firewall):

Consumiendo los logs

En la actualidad existen multitud de destinos para tus logs, te podría interesar el ingestarlos en un elasticsearch para hacer análitica de datos, o tal vez subirlos a un servicio de terceros como Datadog o Amazon S3, las opciones son casi infinitas y va a depender mucho de tus necesidades de negocio.

Es por esto que, siendo fieles a nuestra filosofia de hacer las cosas lo más simples posible, te vamos a proponer que uses dos herramientas ampliamente utilizadas en la comunidad como son Filebeat o/y Logstash para consumir tus logs de nuestro sistema de Logs en Streaming.

Filebeat vs Logstash

Es muy común, sobre todo para gente que no está familiarizada con este tipo de tecnologías el confundir cuando usar Logstash, cuando usar Filebeat o cuando usarlos juntos que también se puede. Aquí vamos a intentar explicarlo de una manera un poco somera pero lo suficientemente sencilla como para poder tomar una decisión al respecto.

Logstash es un programa escrito en java parte del stack ELK (ElasticSearch - Logstash - Kibana) desarrollado y mantenido por la compañía ElasticSearch.

Filebeat sin embargo está escrito en Go por la misma compañía y surgio como respuesta a una necesidad incipiente de la comunidad de tener una herramienta ligera para transportar logs ya que logstash consume bastante más que Filebeat al estar escrito en java.

Filebeat como digo es un software muy liviano que te permite transportar logs de un sitio a otro, lo mismo que Logstash (este segundo no tan liviano), sin embargo, Logstash es mucho más versatil y potente de Filebeat y te permite consumir logs (Inputs) de un número mayor de sitios y enviarlos también a un número mayor de salidas (Ouputs).

Aquí os dejo los enlaces a los Input y Output de Logstash y Filebeat

Por tanto usar Logstash o Filebeat para sacar los logs de nuestro sistema de Logs en Streaming va a depender de tus necesidades, principalmente de el destino final de los mismos, logs por segundo y si quieres hacer algún tipo de transformación con los mismos.

Resumiendo, nuestra recomendación es que uses Filebeat siempre que puedas ya que es más ligero y fácil de configurar. Si necesitas alguna salida que no está en Filebeat o hacer alguna transformación usa Logstash.

Recuerda que siempre tendrás una tercera opción que también es válida y contemplamos en esta documentación y es que escribas tu propio consumer usando tu lenguaje de programación favorito.

Consumir logs mediante Filebeat

Veamos ahora un despliegue sencillo de Filebeat en un servidor Debian como primera toma de contacto en que vamos a dejar el log en un fichero de texto.

La documentación oficial se puede encontrar en: https://www.elastic.co/es/beats/filebeat

Usaremos estos datos de ejemplo pero recuerda que el zip que descargaste tras activar el servicio ya contiene una plantilla con todos los datos necesarios llamada filebeat.yml

  • Certificados c83.crt.pem y c83.key.pem

  • Contraseña: password

  • Topic: c83

  • Consumer group: c83_filebeat

Primero descargamos e instalamos el paquete de Filebeat en nuestro servidor:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.0-amd64.deb
sudo dpkg -i filebeat-7.9.0-amd64.deb

Habilitamos el módulo de Kafka:

filebeat modules enable kafka
filebeat setup -e

Editamos la configuración de Filebeat /etc/filebeat/filebeat.yml y pegamos los datos que tenemos en la plantilla filebeat.yml, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:

  • ssl.certificate: Ubicación de c<ID>.crt.pem

  • ssl.key: Ubicación de c<ID>.key.pem

  • ssl.certificate_authorities: Ubicación de transparentcdnCA.pem

  • path: Ruta final donde se depositarán los logs que consuma Filebeat.

filebeat.inputs:
- type: kafka
hosts:
- kafka1.edgetcdn.com:9093
- kafka2.edgetcdn.com:9093
- kafka2.edgetcdn.com:9093
topics: ["c83"]
group_id: "c83_filebeat"
initial_offset: "newest"
ssl.enabled: yes
ssl.certificate: "/etc/filebeat/secret/c83.crt.pem"
ssl.key: "/etc/filebeat/secret/c83.key.pem"
ssl.key_passphrase: "password"
ssl.certificate_authorities:
- /etc/filebeat/secret/transparentcdnCA.pem
output.file:
codec.format:
string: '%{[message]}'
path: "/tmp/filebeat"
filename: kafka-filebeat.out
rotate_every_kb: 50000

En el servidor donde configuremos Filebeat, copiamos la clave pública y privada del certificado así como la CA de Transparent CDN a las rutas que hayamos definido finalmente en la configuración.

También deberás crear la carpeta definida en el path: en el caso de que no exista.

Una vez configurado todo, tan sólo tendrás que iniciar el servicio de Filebeat, normalmente mediante systemd con systemctl start filebeat, y si todo ha ido bien, verás como se empiezan a consumir los logs en la ruta que hayas definido en path:

root@filebeat:/tmp/filebeat# ls -lrt
total 4
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.7
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.6
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.5
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.4
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.3
-rw------- 1 root root 49M ago 27 08:45 kafka-filebeat.out.2
-rw------- 1 root root 49M ago 27 08:51 kafka-filebeat.out.1
-rw------- 1 root root 4,6M ago 27 08:52 kafka-filebeat.out

Consumir logs mediante Logstash

Ahora veamos cómo consumir nuestros logs mediante Logstash.

Nota: Usaremos el Keystore y el Truststore en lugar del par de claves privada y pública, tendrás que copiarlos al servidor donde ejecutes Logstash. Además, el servicio de Systemd por defecto utiliza el usuario Logstash, por lo que éste deberá tener permisos de lectura sobre ellos. Para el ejemplo los dejaremos en /etc/logstash/certs.

Instalaremos Logstash desde la paquetería oficial, ejecuta los siguientes comandos para agregar el repositorio e instalar Logstash. O bien sigue la guía oficial en https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
apt install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt install logstash

Ahora configuraremos un pipeline, que consumirá los logs desde nuestros servidores de Kafka y los volcará en un fichero en formato JSON, categorizando cada campo del log.

Recuerda que Logstash ofrece múltiples entradas/salidas a distintos sistemas y te permite personalizar y mutar los logs, más info en: (Input plugins, Output plugins).

Para ello crea un fichero nuevo en /etc/logstash/conf.d/kafka-logstash.conf con el contenido que has recibido en la plantilla kafka-logstash.conf en el zip que descargaste desde el panel, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:

  • ssl_keystore_location: Ubicación de c<ID>.keystore.p12

  • ssl_truststore_location: Ubicación de truststore.p12

  • path => : Ruta al fichero donde se volcarán los logs.

input {
kafka {
bootstrap_servers => "kafka1.edgetcdn.com:9093,kafka2.edgetcdn.com:9093,kafka2.edgetcdn.com:9093"
topics => "c83"
group_id => "c83_logstash"
auto_offset_reset => "latest"
security_protocol => "SSL"
ssl_keystore_location => "/etc/logstash/certs/c83.keystore.p12"
ssl_keystore_password => "password"
ssl_truststore_location => "/etc/logstash/certs/truststore.p12"
ssl_truststore_password => "password"
}
}
filter {
grok {
match => {
"message" => [
"%{DATA:clientip} - %{DATA:user} \[(.*)\] \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:useragent}\" %{DATA:hitmiss} \"%{DATA:content-type}\" \"%{DATA:layer}\" %{NUMBER:requesttime} \"%{DATA:clientid}\" \"%{DATA:referer}\" %{DATA:forwardedproto} %{DATA:country}(.*)",
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto} %{DATA:country}(.*)",
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto}(.*)"
]
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate { remove_field => [ "message" ] }
}
output {
file {
path => "/var/log/tcdn_streaming/logstash.out"
codec => json_lines
}
}

Ya que el servicio se ejecutará con el usuario logstash, nos aseguramos de tener los permisos correctos, tanto para los certificados, como la configuración, como el directorio final donde escribirá logstash:

chown logstash:logstash -R /etc/logstash/
mkdir /var/log/tcdn_streaming
chown logstash:logstash /var/log/tcdn_streaming

Ahora sólo resta iniciar Logstash, podemos hacerlo por línea de comandos para verificar que todo funciona con:

sudo -u logstash /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"

Recuerda verificar que todos los archivos necesarios son accesibles por el usuario "logstash", incluyendo los certificados, y el fichero + carpeta de destino.

Al cabo de unos segundos, comenzarás a recibir los logs en el fichero o salida que hayas indicado en formato JSON, te dejamos un ejemplo:

root@logstash:/var/log/tcdn_streaming# tail -1 logstash.out | jq
{
"requesttime": "0.000193",
"hitmiss": "hit",
"clientid": "83",
"verb": "GET",
"content-type": "application/javascript",
"httpversion": "HTTP/1.1",
"bytes": "621",
"request": "http://www.ejemplo.com/build/js/Core/Core.3160595ce5a674e1205b409c3e53616c4b44a9b3.js",
"referer": "https://referer.ejemplo.com/",
"user": "-",
"forwardedproto": "https",
"clientip": "11.11.222.111",
"@version": "1",
"statuscode": "200",
"layer": "L1",
"useragent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Mobile/15E148 Safari/604.1",
"@timestamp": "2020-09-14T12:44:19.475Z"
}

Si todo ha ido bien, puedes cancelar el comando anterior y dejar el servicio ejecutándose con:

systemctl start logstash.service

Consumir logs mediante un script de Python a medida

Se puede programar un consumer en muchos lenguajes distintos, vamos a ver un ejemplo con Python, utilizando el cliente o librería de "confluent-kafka-python" https://github.com/confluentinc/confluent-kafka-python

Otra opción también muy popular es https://github.com/dpkp/kafka-python

Vamos con el ejemplo (nuevamente, usaremos Debian como sistema operativo):

Instalamos los paquetes necesarios:

sudo apt install python3-pip
pip3 install confluent-kafka --user

Creamos el fichero python con el contenido que recibiste en la plantilla consumer.py, abajo se muestran unos datos de ejemplo, la parte importante estaría en el apartado de configuración, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas:

  • ssl.ca.location: Ubicación de transparentcdnCA.pem

  • ssl.keystore.location: Ubicación de c<ID>.keystore.p12

#!/usr/bin/python3
import sys
import logging
import socket
from confluent_kafka import Consumer
LOG_FMT = '[%(asctime)s][%(levelname)s] %(message)s'
logging.basicConfig(
stream=sys.stdout,
format=LOG_FMT,
datefmt='%Y.%m.%d %H:%M:%S',
level=logging.INFO)
def assigned(consum, parti):
logging.info("Assigned consumer: %s on partition %s", repr(consum), repr(parti))
def revoked(consum, parti):
logging.error("Failed to assign consumer: %s on partition %s", repr(consum), repr(parti))
if __name__ == "__main__":
# CONFIGURACION
SERVERS = 'kafka1.edgetcdn.com:9093,kafka2.edgetcdn.com:9093,kafka3.edgetcdn.com:9093'
TOPIC = 'c83'
CONF = {'bootstrap.servers': SERVERS,
'client.id': socket.gethostname(),
'security.protocol': 'SSL',
'ssl.ca.location': '/usr/local/share/ca-certificates/transparentcdnCA.pem',
'ssl.keystore.location': '/root/secret/c83.keystore.p12',
'ssl.keystore.password': 'password',
'group.id': TOPIC + '_python'}
# FIN CONFIGURACION
consumer = Consumer(CONF)
consumer.subscribe([TOPIC], on_assign=assigned, on_revoke=revoked)
count = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
#logging.info("Waiting for message or event/error in poll()")
continue
if msg.error():
logging.error(msg.error())
else:
value = msg.value()
offset = msg.offset()
topic = msg.topic()
logging.info("Mensaje recibido numero {} - Value: \"{}\" - Topic: {} - Offset: {}".format(
str(count),
value.decode('utf-8'),
str(topic),
str(offset)))
count += 1
except KeyboardInterrupt:
pass
finally:
consumer.close()

Si todo es correcto, al iniciar el consumer recibirás el siguiente mensaje y empezarás a consumir del topic:

root@server1:~# ./consumer.py
[2020.08.21 09:57:21][INFO] Assigned consumer: <cimpl.Consumer object at 0x7fada8213f28> on partition [TopicPartition{topic=c83,partition=0,offset=-1001,error=None}, TopicPartition{topic=c83,partition=1,offset=-1001,error=None}]

¿Qué son los Consumer Groups?

Tradicionalmente los Brókers de mensajería (Message Brokers), actuaban de dos formas:

  • Queue: El mensaje se publica una vez, se consume una vez.

  • Pub/Sub: El mensaje se publica una vez, se consume múltiples veces.

Kafka puede funcionar de las dos formas gracias a los Consumer Groups:

  • Si queremos actuar como una cola (Queue), ponemos todos los consumers en el mismo consumer group.

  • Si queremos actuar como un Pub/Sub, cada consumer va en un grupo distinto.

Ejemplos

Actualmente, creamos los topics con 2 particiones por defecto (se pueden aumentar si se solicita), vamos a ver unos ejemplos con un topic de 2 particiones:

  • Iniciamos 3 consumers, los 3 en el mismo consumer group: Uno de ellos, consume de la partición 0, otro de la 1, y el último queda totalmente parado. Conseguimos procesamiento en paralelo y alta disponibilidad. (La alta disponibilidad también la logramos con 2 consumers, si uno falla, el otro consumirá de las dos particiones). Los mensajes estarán repartidos entre el consumer 1 y el consumer 2.

  • Iniciamos 2 consumers, cada uno en distinto consumer group: Los 2 van a recibir TODOS los mensajes del topic y estarán totalmente aislados. Es útil si queremos realizar un procesamiento distinto de los mensajes recibidos para cada uno de ellos. A este esquema podemos añadir más consumers, el resultado será el mismo, cada uno de ellos recibirá todos los mensajes de todas las particiones.

Dado que trabajamos con logs y a no ser que se requieran varios post-procesos distintos, lo más interesante es tener los consumers en el mismo consumer group, y es más que probable que solamente un consumer sea suficiente dado el rendimiento que ofrece Kafka. Se pueden iniciar mas consumers si uno de ellos no puede consumir en tiempo real, o si queremos procesamiento en paralelo + alta disponibilidad.