A algum tempo fiz um post sobre Apache Kafka explicando os conceitos principais como o funcionamento, produtores, consumidores, tópicos e outros. Também escrevi sobre um pouco da história do Kafka e aplicabilidade. Para esse post pretendo dar continuidade ao assunto, mostrando na prática o funcionamento dessa ferramenta.

Para ganhar conhecimento, adicione coisas todos os dias. Para ganhar sabedoria, elimine coisas todos os dias.

– Lao-Tsé

Configurando o ambiente

Inicialmente é necessário ter o Kafka rodando na máquina para poder praticar a codificação. No endereço https://kafka.apache.org/quickstart ensina como baixar, configurar e executar com um passo a passo bem explicativo. Para esse artigo irá ser utilizado um container mantido pela Confluent devido à comodidade que a abordagem possibilita.

Antes de começar

Para seguir o post é necessário instalar as ferramentas necessárias que são o Docker, Docker Compose e o GIT. Para instalar o Docker em Linux baseado em Ubuntu siga as instruções encontradas em https://docs.docker.com/engine/install/ubuntu/ para o Windows siga as instruções em https://docs.docker.com/docker-for-windows/install/.

O Docker Compose pode ser instalado seguindo os passos em https://docs.docker.com/compose/install/#install-compose e o GIT em https://git-scm.com/book/en/v2/Getting-Started-Installing-Git.

Instalação e inicialização do Kafka

Para facilitar será utilizado um arquivo docker-compose disponível no repositório GIT mantido pela Confluent em https://github.com/confluentinc/kafka-images.

Primeiro passo é clonar o projeto:

git clone https://github.com/confluentinc/kafka-images.git

Após copiar o projeto, deve-se ir até a pasta ./kafka-images/examples/kafka-single-node. Na pasta irá encontrar o seguinte arquivo docker-compose.yml:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Esse arquivo irá subir o Zookeper e o Kafka na máquina, para isso utilizamos o comando docker-compose up -d. Com isso os servidores serão executados em segundo plano, pode ser verificado com docker-compose ps:

               Name                           Command            State              Ports            
-----------------------------------------------------------------------------------------------------
kafka-producer-example_kafka_1       /etc/confluent/docker/run   Up      0.0.0.0:9092->9092/tcp      
kafka-producer-example_zookeeper_1   /etc/confluent/docker/run   Up      2181/tcp, 2888/tcp, 3888/tcp

É possível checar se o Kafka inicializou corretamente através do comando docker-compose logs kafka | grep -i started, que irá mostrar uma listagem parecido com:

kafka_1      | [2020-09-16 17:09:48,718] INFO [SocketServer brokerId=1] Started 2 acceptor threads for data-plane (kafka.network.SocketServer)
kafka_1      | [2020-09-16 17:09:49,017] DEBUG [ReplicaStateMachine controllerId=1] Started replica state machine with initial state -> Map() (kafka.controller.ZkReplicaStateMachine)
kafka_1      | [2020-09-16 17:09:49,024] DEBUG [PartitionStateMachine controllerId=1] Started partition state machine with initial state -> Map() (kafka.controller.ZkPartitionStateMachine)
kafka_1      | [2020-09-16 17:09:49,111] INFO [SocketServer brokerId=1] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka_1      | [2020-09-16 17:09:49,116] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Criando um tópico

Para criar um tópico deve-se executar o comando diretamente no container que está rodando o Kafka com o comando docker-compose exec. No exemplo a seguir é criado um tópico com o nome EXEMPLO_TOPICO:

docker-compose exec kafka kafka-topics --create --topic EXEMPLO_TOPICO --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

Para confirmar que foi criado com sucesso, executamos o seguinte comando:

docker-compose exec kafka kafka-topics --describe --topic EXEMPLO_TOPICO --zookeeper zookeeper:2181

Topic: EXEMPLO_TOPICO   PartitionCount: 1       ReplicationFactor: 1    Configs: 
Topic: EXEMPLO_TOPICO   Partition: 0    Leader: 1       Replicas: 1     Isr: 1

Produzindo uma mensagem com um Producer

Com o tópico criado, é possível enviar mensagens para ele através do comando kafka-console-producer, segue um exemplo para docker:

docker-compose exec kafka bash -c "seq 100 | kafka-console-producer --request-required-acks 1 --broker-list kafka:29092 --topic EXEMPLO_TOPICO && echo 'Produzido 100 mensagens.'"

Funcionando corretamente irá aparece algo como: Produzido 100 mensagens.

Lendo as mensagens com um Consumer

Para ler as mensagens criadas utiliza-se o comando:

docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic EXEMPLO_TOPICO --from-beginning --max-messages 100

Irá aparecer todos os números gerados.

Criando um produtor em Java

Antes de iniciar o código é preciso configurar a dependência kafka-clients no maven:


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

Para criar um producer utilizamos a classe KafkaProducer com o tipo da chave e da mensagem, para o exemplo o tipo da mensagem será String:

var producer=new KafkaProducer<String, String>(properties());

O construtor da classe KafkaProducer recebe um objeto Properties o qual serão passados os parâmetros de configuração do produtor. A seguir o método que constrói o objeto de configuração:

private static Properties properties(){
        var properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka:29092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
        }

No código apresentado é configurado o endereço do broker para conectar e logo na sequência é configurado o tipo de serialização da chave e valor, que será realizado pelo serializador StringSerializer.class.getName() que converte Strings para bytes.

Ainda é necessário configurar o envio e a mensagem, o que pode ser visto no código a seguir:

 public static void main(String[]args)throws ExecutionException,InterruptedException{
        var producer=new KafkaProducer<String, String>(properties());
        var key="TEMPERATURA";
        var value="34º";
        var record=new ProducerRecord<String, String>("EXEMPLO_TOPICO",key,value);
        Callback callback=(data,ex)->{
        if(ex!=null){
        ex.printStackTrace();
        return;
        }
        System.out.println("Mensagem enviada com sucesso para: "+data.topic()+" | partition "+data.partition()+"| offset "+data.offset()+"| tempo "+data.timestamp());
        };
        producer.send(record,callback).get();
        }

No código apresentado foi configurado um objeto ProduceRecord que irá conter a mensagem e o nome do tópico para publicá-la. O método send do produtor é assíncrono, fazendo necessário a utilização de um objeto Callback para obter informações sobre a operação.

Utilizando um consumidor pelo terminal é possível obter o resultado do envio:

❯ docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic EXEMPLO_TOPICO --from-beginning --max-messages 100
34º

Criando um consumidor em Java

Para criar o consumidor basicamente são os mesmos passos do produtor com poucas mudanças, a primeira é a classe utilizada para representar o consumidor que é a KafkaConsumer. A mesma também recebe um objeto properties de configuração sendo preciso definir o tipo da chave e do valor, conforme foi observado na construção do produtor:

var consumer=new KafkaConsumer<String, String>(properties());

A configuração muda pouco também:

private static Properties properties(){
        var properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,TemperatureControl.class.getName());
        return properties;
        }

Agora como é preciso consumir a mensagem é necessário configurar o desserializador, que irá transformar a mensagem de bytes para o formato desejado. No exemplo é utilizado o StringDeserializer para converter a mensagem em String, para o consumidor sempre será necessário informar o ID do grupo o qual ele pertence, para isso deve-se configurar a propriedade ConsumerConfig.GROUP_ID_CONFIG e definir um nome.

Para ouvir a mensagem é necessário realizar a assinatura no tópico para obter os registros, segue o código que simula o consumo e processamento de uma mensagem:

public static void main(String[]args){
        var consumer=new KafkaConsumer<String, String>(properties());
        consumer.subscribe(Collections.singletonList("EXEMPLO_TOPICO"));

        while(true){
        var records=consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String, String> registro:records){
        System.out.println("------------------------------------------");
        System.out.println("Recebendo nova temperatura");
        System.out.println(registro.key());
        System.out.println(registro.value());

final String valor=registro.value().replaceAll("º","");
final Integer temperatura=Integer.valueOf(valor);
        if(temperatura>30){
        System.out.println("Está calor");
        }else if(temperatura< 20){
        System.out.println("Está frio");
        }

        System.out.println("Temperatura processada.");
        }
        }
        } 

Ao deixar o consumidor executando e realizando mais um envio de mensagem com o produtor criado, o seguinte resultado é obtido:

------------------------------------------
Recebendo nova temperatura
TEMPERATURA
34º
Está calor
Temperatura processada.

Conclusão

A ideia do artigo é fornecer uma base simples para entender os conceitos do Kafka. Espero que seja útil para evoluir os estudos dessa tecnologia para quem lê. Se você quiser acessar o código completo, ele está disponível no meu github.