Logs en streaming

Una guía rápida de cómo echar a andar tu primer consumer

Si quieres activar el servicio de Streaming de logs en tiempo real, contacta con nosotros en support@transparentcdn.com solicitando el alta.

Al darte de alta en el servicio de Streaming de logs en tiempo real, recibirás por nuestra parte un correo con la configuración necesaria y tu certificado de cliente para poder autenticarte contra nuestros Brokers.

En el correo podrás encontrar lo siguiente:

  • La dirección de nuestros brokers:

    • kafka1.edgetcdn.com

    • kafka2.edgetcdn.com

    • kafka3.edgetcdn.com

  • El puerto a utilizar, que será el 9093

  • Certificado público de cliente <ID>.crt.pem

  • Certificado privado encriptado <ID>.encrypted.key.pem

  • Keystore en formato PKCS12 <ID>.keystore.p12

  • Contraseña del certificado privado y la keystore

  • El certificado público de nuestra CA TransparentCDN.pem

  • El Topic al que suscribirse

  • Los Consumer Group al que unir tus consumers

Nosotros necesitaremos:

  • La(s) dirección(es) IP(s) desde donde conectarán tus consumers.

Consumiendo los logs

En la actualidad existen multitud de destinos para tus logs, te podría interesar el ingestarlos en un elasticsearch para hacer análitica de datos, o tal vez subirlos a un servicio de terceros como Datadog o Amazon S3, las opciones son casi infinitas y va a depender mucho de tus necesidades de negocio.

Es por esto que, siendo fieles a nuestra filosofia de hacer las cosas lo más simples posible, te vamos a proponer que uses dos herramientas ampliamente utilizadas en la comunidad como son Filebeat o/y Logstash para consumir tus logs de nuestro sistema de Logs en Streaming.

Filebeat vs Logstash

Es muy común, sobre todo para gente que no está familiarizada con este tipo de tecnologías el confundir cuando usar Logstash, cuando usar Filebeat o cuando usarlos juntos que también se puede. Aquí vamos a intentar explicarlo de una manera un poco somera pero lo suficientemente sencilla como para poder tomar una decisión al respecto.

Logstash es un programa escrito en java parte del stack ELK (ElasticSearch - Logstash - Kibana) desarrollado y mantenido por la compañía ElasticSearch.

Filebeat sin embargo está escrito en Go por la misma compañía y surgio como respuesta a una necesidad incipiente de la comunidad de tener una herramienta ligera para transportar logs ya que logstash consume bastante más que Filebeat al estar escrito en java.

Filebeat como digo es un software muy liviano que te permite transportar logs de un sitio a otro, lo mismo que Logstash (este segundo no tan liviano), sin embargo, Logstash es mucho más versatil y potente de Filebeat y te permite consumir logs (Inputs) de un número mayor de sitios y enviarlos también a un número mayor de salidas (Ouputs).

Aquí os dejo los enlaces a los Input y Output de Logstash y Filebeat

Por tanto usar Logstash o Filebeat para sacar los logs de nuestro sistema de Logs en Streaming va a depender de tus necesidades, principalmente de el destino final de los mismos, logs por segundo y si quieres hacer algún tipo de transformación con los mismos.

Resumiendo, nuestra recomendación es que uses Filebeat siempre que puedas ya que es más ligero y fácil de configurar. Si necesitas alguna salida que no está en Filebeat o hacer alguna transformación usa Logstash.

Recuerda que siempre tendrás una tercera opción que también es válida y contemplamos en esta documentación y es que escribas tu propio consumer usando tu lenguaje de programación favorito.

Consumir logs mediante Filebeat

Veamos ahora un despliegue sencillo de Filebeat en un servidor Debian como primera toma de contacto en que vamos a dejar el log en un fichero de texto.

La documentación oficial se puede encontrar en: https://www.elastic.co/es/beats/filebeat

Usemos estos datos de ejemplo:

  • Certificados c83.crt.pem y c83.encrypted.key.pem

  • Contraseña: password

  • Topic: c83

  • Consumer group: c83_0

Primero descargamos e instalamos el paquete de Filebeat en nuestro servidor:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.0-amd64.deb
sudo dpkg -i filebeat-7.9.0-amd64.deb

Habilitamos el módulo de kafka:

filebeat modules enable kafka
filebeat setup -e

Editamos la configuración de Filebeat /etc/filebeat/filebeat.yml y dejamos el fichero como muestra a continuación:

filebeat.inputs:
- type: kafka
hosts:
- kafka1.edgetcdn.com:9093
- kafka2.edgetcdn.com:9093
- kafka2.edgetcdn.com:9093
topics: ["c83"]
group_id: "c83_0"
initial_offset: "newest"
ssl.enabled: yes
ssl.certificate: "/root/secret/c83.crt.pem"
ssl.key: "/root/secret/c83.encrypted.key.pem"
ssl.key_passphrase: "password"
ssl.certificate_authorities:
- /usr/local/share/ca-certificates/transparentcdnCA.pem
output.file:
codec.format:
string: '%{[message]}'
path: "/tmp"
filename: kafka-filebeat.out
rotate_every_kb: 50000

A continuación, creamos el directorio para los logs:

mkdir /var/log/tcdn_streaming

En el servidor donde configuremos Filebeat, copiamos la clave pública y privada del certificado así como la CA de Transparent CDN a las rutas indicadas en la configuración, o las que se prefieran usar en este caso, ya que sólo son un ejemplo rápido.

Si todo ha ido bien, podrás ver cómo entran los logs de tus Sitios en /var/log/tcdn_streaming:

root@filebeat:/var/log/tcdn_streaming# ls -lrt
total 4
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.7
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.6
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.5
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.4
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.3
-rw------- 1 root root 49M ago 27 08:45 kafka-filebeat.out.2
-rw------- 1 root root 49M ago 27 08:51 kafka-filebeat.out.1
-rw------- 1 root root 4,6M ago 27 08:52 kafka-filebeat.out

Consumir logs mediante Logstash

Ahora veamos cómo consumir nuestros logs mediante Logstash, utilizaremos los mismos datos de ejemplo anteriores.

Nota: Usaremos el Keystore y el Truststore en lugar del par de claves privada y pública, tendrás que copiarlos al servidor donde ejecutes Logstash. Además, el servicio de Systemd por defecto utiliza el usuario Logstash, por lo que éste deberá tener permisos de lectura sobre ellos. Para el ejemplo los dejaremos en /etc/logstash/certs.

Instalaremos Logstash desde la paquetería oficial, ejecuta los siguientes comandos para agregar el repositorio e instalar Logstash. O bien sigue la guía oficial en https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
apt install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt install logstash

Ahora configuraremos un pipeline, que consumirá los logs desde nuestros servidores de Kafka y los volcará en un fichero en formato JSON, categorizando cada campo del log.

Recuerda que Logstash ofrece múltiples entradas/salidas a distintos sistemas y te permite personalizar y mutar los logs, más info en: (Input plugins, Output plugins).

Para ello crea un fichero nuevo en /etc/logstash/conf.d/kafka.conf con el siguiente contenido (edita los valores para que encajen con tu configuración):

input {
kafka {
bootstrap_servers => "kafka1.edgetcdn.com:9093,kafka2.edgetcdn.com:9093,kafka2.edgetcdn.com:9093"
topics => "c83"
group_id => "c83_1"
auto_offset_reset => "latest"
security_protocol => "SSL"
ssl_keystore_location => "/etc/logstash/certs/c83.keystore.p12"
ssl_keystore_password => "password"
ssl_truststore_location => "/etc/logstash/certs/truststore.p12"
ssl_truststore_password => "password"
}
}
filter {
grok {
match => {
"message" => [
"%{DATA:clientip} - %{DATA:user} \[(.*)\] \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:useragent}\" %{DATA:hitmiss} \"%{DATA:content-type}\" \"%{DATA:layer}\" %{NUMBER:requesttime} \"%{DATA:clientid}\" \"%{DATA:referer}\" %{DATA:forwardedproto} %{DATA:country}(.*)",
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto} %{DATA:country}(.*)",
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto}(.*)"
]
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate { remove_field => [ "message" ] }
}
output {
file {
path => "/var/log/tcdn_streaming/logstash.out"
codec => json_lines
}
}

Ahora sólo resta iniciar Logstash, podemos hacerlo por línea de comandos para verificar que todo funciona con:

sudo -u logstash /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"O bien, iniciar el servicio:

Recuerda verificar que todos los archivos necesarios son accesibles por el usuario "logstash", incluyendo los certificados, y que la carpeta de destino, si estás usando la salida a fichero del ejemplo, tienen permisos de escritura.

systemctl start logstash.service

Al cabo de unos segundos, comenzarás a recibir los logs en el fichero o salida que hayas indicado en formato JSON, te dejamos un ejemplo:

root@logstash:/var/log/tcdn_streaming# tail -1 logstash.out | jq
{
"requesttime": "0.000193",
"hitmiss": "hit",
"clientid": "83",
"verb": "GET",
"content-type": "application/javascript",
"httpversion": "HTTP/1.1",
"bytes": "621",
"request": "http://www.ejemplo.com/build/js/Core/Core.3160595ce5a674e1205b409c3e53616c4b44a9b3.js",
"referer": "https://referer.ejemplo.com/",
"user": "-",
"forwardedproto": "https",
"clientip": "11.11.222.111",
"@version": "1",
"statuscode": "200",
"layer": "L1",
"useragent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Mobile/15E148 Safari/604.1",
"@timestamp": "2020-09-14T12:44:19.475Z"
}

Consumir logs mediante un script de Python a medida

Se puede programar un consumer en muchos lenguajes distintos, vamos a ver un ejemplo con Python, utilizando el cliente o librería de "confluent-kafka-python" https://github.com/confluentinc/confluent-kafka-python

Otra opción también muy popular es https://github.com/dpkp/kafka-python

Vamos con el ejemplo (nuevamente, usaremos Debian como sistema operativo):

Instalamos los paquetes necesarios:

sudo apt install python3-pip
pip3 install confluent-kafka --user

Creamos el fichero python con el siguiente contenido, la parte importante estaría en el apartado de configuración, edítala para que se ajuste a tus datos:

Nota: en este ejemplo, usamos el Keystore en lugar del par de claves privada y pública, tendrás que copiarlo al servidor donde ejecutes el consumer.

#!/usr/bin/python3
import sys
import logging
import socket
from confluent_kafka import Consumer
LOG_FMT = '[%(asctime)s][%(levelname)s] %(message)s'
logging.basicConfig(
stream=sys.stdout,
format=LOG_FMT,
datefmt='%Y.%m.%d %H:%M:%S',
level=logging.INFO)
def assigned(consum, parti):
logging.info("Assigned consumer: %s on partition %s", repr(consum), repr(parti))
def revoked(consum, parti):
logging.error("Failed to assign consumer: %s on partition %s", repr(consum), repr(parti))
if __name__ == "__main__":
# CONFIGURACION
SERVERS = 'kafka1.edgetcdn.com:9093,kafka2.edgetcdn.com:9093,kafka3.edgetcdn.com:9093'
TOPIC = 'c83'
CONF = {'bootstrap.servers': SERVERS,
'client.id': socket.gethostname(),
'security.protocol': 'SSL',
'ssl.ca.location': '/usr/local/share/ca-certificates/transparentcdnCA.pem',
'ssl.keystore.location': '/root/secret/c83.keystore.p12',
'ssl.keystore.password': 'password',
'group.id': TOPIC + '_0'}
# FIN CONFIGURACION
consumer = Consumer(CONF)
consumer.subscribe([TOPIC], on_assign=assigned, on_revoke=revoked)
count = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
#logging.info("Waiting for message or event/error in poll()")
continue
if msg.error():
logging.error(msg.error())
else:
value = msg.value()
offset = msg.offset()
topic = msg.topic()
logging.info("Mensaje recibido numero {} - Value: \"{}\" - Topic: {} - Offset: {}".format(
str(count),
value.decode('utf-8'),
str(topic),
str(offset)))
count += 1
except KeyboardInterrupt:
pass
finally:
consumer.close()

Si todo es correcto, al iniciar el consumer recibirás el siguiente mensaje y empezarás a consumir del topic:

root@server1:~# ./consumer.py
[2020.08.21 09:57:21][INFO] Assigned consumer: <cimpl.Consumer object at 0x7fada8213f28> on partition [TopicPartition{topic=c83,partition=0,offset=-1001,error=None}, TopicPartition{topic=c83,partition=1,offset=-1001,error=None}]