Kafka, dotnet and SASL_SSL

This is similar to my previous post, only now the question is, how do you connect to a Kafka server using dotnet and SASL_SSL? This is how:

// based on https://github.com/confluentinc/confluent-kafka-dotnet/blob/v1.0.0/examples/Producer/Program.cs

using Confluent.Kafka;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;


namespace Confluent.Kafka.Examples.ProducerExample
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            string topicName = "test-topic";

            var config = new ProducerConfig {
                BootstrapServers = "kafka-server.example.com:19094",
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = "ca-cert",
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "USERNAME",
                SaslPassword = "PASSWORD",
                Acks = Acks.Leader,
                CompressionType = CompressionType.Lz4,
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                for (int i = 0; i < 1000000; i++)
                {
                    var message = $"Event {i}";

                    try
                    {
                        // Note: Awaiting the asynchronous produce request
                        // below prevents flow of execution from proceeding
                        // until the acknowledgement from the broker is
                        // received (at the expense of low throughput).

                        var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");

                        // Let's not await then
                        // producer.ProduceAsync(topicName, new Message<string, string> { Key = null, Value = message } );
                        // Console.WriteLine($"Event {i} sent.");
                    }
                    catch (ProduceException<string, string> e)
                    {
                        Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                    }
                }

                // producer.Flush(TimeSpan.FromSeconds(120));

                // Since we are producing synchronously, at this point there will be no messages
                // in-flight and no delivery reports waiting to be acknowledged, so there is no
                // need to call producer.Flush before disposing the producer.
            }
        }
    }
}

Since I am a total .NET newbie, I usually docker run -it --rm microsoft/dotnet and experiment from there.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s