Pulsar vs Kafka in K8s: Battle of Event Streams

2024-05-07

Pulsar vs Kafka

Tldr;

Pulsar vs Kafka

This tutorial walks you through deploying both Apache Pulsar and Apache Kafka on a local Kubernetes cluster using Kind, developing Go-based producers and consumers for each, performing load testing with Locust and k6 and monitoring performance using Prometheus and Grafana.

Prerequisites

  • Docker
  • Kind
  • kubectl
  • Helm
  • Go
  • k6
  • Locust

Setting Up the Local Kubernetes Cluster with Kind

cat <<EOF | kind create cluster --name event-streams --config=-
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
  - role: worker
EOF

Verify the cluster is up and running:

kubectl cluster-info --context kind-event-streams

Deploy Apache Kafka (strimzi)

helm repo add strimzi https://helm-charts.strimzi.io
helm repo update
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator --namespace kafka --create-namespace

kubectl get pods -n kafka

Deploy Apache Pulsar

helm repo add apache https://pulsar.apache.org/charts
helm repo update

# deploy pulsar
kubectl create namespace pulsar
helm install pulsar apache/pulsar --namespace pulsar --set initialize=true

# verify pulsar is up and running
kubectl get pods -n pulsar

Kafka Producer and Consumer

We'll use the Confluent Kafka Go client.

Producer

package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
)

func main() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka.kafka.svc.cluster.local:9092"})
    if err != nil {
        log.Fatal(err)
    }
    defer p.Close()

    for i := 0; i < 10; i++ {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &"test", Partition: kafka.PartitionAny},
            Value:          []byte("Hello Kafka"),
        }, nil)
    }
}

Consumer

package main

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
)

func main() {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "kafka.kafka.svc.cluster.local:9092",
        "group.id":          "go-consumer",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    c.SubscribeTopics([]string{"test"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            log.Printf("Received message: %s\n", string(msg.Value))
        } else {
            log.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }
}

Pulsar Producer and Consumer

Producer:

package main

import (
    "context"
    "github.com/apache/pulsar-client-go/pulsar"
    "log"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://pulsar-broker.pulsar.svc.cluster.local:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "test",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    if _, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
        Payload: []byte("Hello Pulsar"),
    }); err != nil {
        log.Fatal(err)
    }
}

Consumer:

package main

import (
    "context"
    "github.com/apache/pulsar-client-go/pulsar"
    "log"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://pulsar-broker.pulsar.svc.cluster.local:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "test",
        SubscriptionName: "go-subscriber",
        Type:             pulsar.Shared,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Received message: %s", string(msg.Payload()))
    consumer.Ack(msg)
}

Load Testing with Locust & k6

Locust:

pip3 install locust

Create a locustfile.py:

from locust import User, task, between
from kafka import KafkaProducer

class KafkaUser(User):
    wait_time = between(1, 5)

    def on_start(self):
        self.producer = KafkaProducer(bootstrap_servers='kafka.kafka.svc.cluster.local:9092')

    @task
    def send_message(self):
        self.producer.send('test', b'Hello Kafka')

locust -f locustfile.py Access the web UI at http://localhost:8089.

k6:

brew install k6

kafka_test.js:

import { check } from 'k6';
import { Kafka } from 'k6/x/kafka';

const kafka = new Kafka({
    brokers: ['kafka.kafka.svc.cluster.local:9092'],
});

export default function () {
    const producer = kafka.producer();
    producer.produce({
        topic: 'test',
        messages: [{ value: 'Hello Kafka' }],
    });
    producer.disconnect();
}

k6 run kafka_test.js

Monitoring with Prometheus and Grafana

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo add grafana https://grafana.github.io/helm-charts
helm repo update

kubectl create namespace monitoring

helm install prometheus prometheus-community/prometheus --namespace monitoring
helm install grafana grafana/grafana --namespace monitoring

Retrieve Grafana admin password:

kubectl get secret --namespace monitoring grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

Port-forward Grafana:

kubectl port-forward svc/grafana -n monitoring 3000:80

Access Grafana at http://localhost:3000.

Configure Dashboards TODO

  • Kafka: Use JMX Exporter to expose Kafka metrics to Prometheus.
  • Pulsar: Metrics are exposed at /metrics endpoint. Ensure Prometheus is scraping these endpoints.

Import relevant dashboards into Grafana to visualize metrics like throughput, latency and resource usage.

Comparative Analysis TODO

After setting up both systems and running load tests:

  • Throughput: Measure messages per second.
  • Latency: Measure the time taken for messages to be processed.
  • Resource Utilization: Monitor CPU and memory usage.

Use Grafana dashboards to compare these metrics side by side for Kafka and Pulsar under similar load conditions.

Related Posts