Kafka, dotnet and SASL_SSL

This is similar to my previous post, only now the question is, how do you connect to a Kafka server using dotnet and SASL_SSL? This is how:

// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/v1.0.0/examples/Producer/Program.cs

using Confluent.Kafka;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;


namespace Confluent.Kafka.Examples.ProducerExample
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            string topicName = "test-topic";

            var config = new ProducerConfig {
                BootstrapServers = "kafka-server.example.com:19094",
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "ca-cert",
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "USERNAME",
                SaslPassword = "PASSWORD",
                Acks = Acks.Leader,
                CompressionType = CompressionType.Lz4,
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                for (int i = 0; i < 1000000; i++)
                {
                    var message = $"Event {i}";

                    try
                    {
                        // Note: Awaiting the asynchronous produce request
                        // below prevents flow of execution from proceeding
                        // until the acknowledgement from the broker is
                        // received (at the expense of low throughput).

                        var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");

                        // Let's not await then
                        // producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"Event {i} sent.");
                    }
                    catch (ProduceException<string, string> e)
                    {
                        Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                    }
                }

                // producer.Flush(TimeSpan.FromSeconds(120));

                // Since we are producing synchronously, at this point there will be no messages
                // in-flight and no delivery reports waiting to be acknowledged, so there is no
                // need to call producer.Flush before disposing the producer.
            }
        }
    }
}

Since I am a total .NET newbie, I usually docker run -it --rm microsoft/dotnet and experiment from there.

Advertisements

Kafka, PHP and SASL_SSL

When you want to connect to a Kafka cluster from PHP there are numerous examples showing how to use php-rdkafka, but unauthenticated. But what happens when you need to let a customer connect to a Kafka setup and IP whitelisting is not enough? Not much easily locatable information is out there.

Why not correct this by combing through various web pages and the librdkafka source code:

<?php

$conf = new RdKafka\Conf();
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', 'USERNAME_HERE');
$conf->set('sasl.password', 'PASSWORD_HERE');
$conf->set('ssl.ca.location', '/usr/local/etc/ca-cert.pem');
$conf->set('ssl.cipher.suites', 'TLSv1.2');

$rk = new RdKafka\Producer($conf);
$rk->addBrokers("SASL_SSL://kafka-1.example.com:19094");
$rk->addBrokers("SASL_SSL://kafka-2.example.com:19094");
$rk->addBrokers("SASL_SSL://kafka-3.example.com:19094");

$topic = $rk->newTopic("kafka-test-topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $rk->poll(0);
}

while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}

?>

Still this may not be enough if it is the case that your Kafka server is on OpenSSL-1.0.2 (CentOS 7 for example) and your php client is on OpenSSL-1.1.0 (like the php:7.2-cli docker image). In such a case you need to alter your client’s openssl.cnf to comment out the following line:

;CipherString = DEFAULT@SECLEVEL=2