Active Directory group based authorization for Apache Kafka

written by Sönke Liebau on 2018-03-03

TL/DR

In this blog post I will explain how custom authorization modules can be developed for Kafka without having to recompile Kafka itself. This will be demonstrated with the example of authorization based on Active Directory groups.

All code shown in this post is also available on GitHub. A compiled version for Kafka 1.0.0 is available here.

Big thanks go out to Alex Loddengaard, Lars Francke and Tim Robertson for reviewing this (multiple times).

Table of contents

Introduction

Security in Kafka has come a long way since the early days when the only option of securing a Kafka cluster was with a firewall. Current versions support SSL and SASL for authentication, the upcoming version 1.1 will also allow obtaining delegation tokens. Especially the latter feature will make life a lot easier for people using Spark Streaming with Kafka. But this will be the topic of a future blog post - until then if you are interested in the reasoning behind delegation tokens I encourage you to read this article.

Authentication is only half the equation of course. After Kafka knows who you are it needs to figure out what you are allowed to do and see. To make authorization as flexible as possible a model allowing the creation of custom authorizers was chosen. These custom authorizers can then be defined in the configuration and are dynamically loaded at runtime without changing the Kafka code at all.

So if you need to grant access based on moon phases, what you'd do is implement a MoonPhaseAuthorizer that roughly looks like this:

override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
  moon.isFullMoon(System.currentTimeGetMillis())
}

Then you'd put this in the classpath of the Kafka brokers, configure the broker setting authorizer.class.name and relax in the knowledge that your data is safe.

Not everybody has very specific security needs like these though, so in order to serve the default use case Kafka comes with a reference implementation: SimpleAclAuthorizer. This enables basic authorization (more details follow further down). There are of course no definitive numbers, but in my experience 99% of people using Kafka with security enabled are happily using the SimpleAclAuthorizer. And when I say happily I mean that there are of course gripes about missing functionality, but overall it works and there is no need to go down the route of rolling your own.

The complaints I hear most often are:

  • No wildcard support for resources or principal names beyond "match all"
  • No support for IP ranges
  • No support for group based ACLs

It is that last point that we will pick up today to demonstrate implementing a custom authorizer class that retrieves a user's groups from Active Directory and allows creating ACLs that use these.

Implementation

The following will require a basic understanding of Kafka ACLs and how to manage them, so at this point I would recommend at least skimming the official or Confluent documentation if you are not familiar with this topic.

For the SimpleAclAuthorizer the only principal that can be used in ACLs is the username. The format of this differs, depending on how the user authenticated himself when connecting to Kafka:

Authentication
method
User Principal
PLAINTEXT ANONYMOUS
SSL Information is taken from the certificate the client presented:
CN=username;OU=...;DN=...
SASL Depends on the SASL mechanism used, for Kerberos it would usually be a user principal in the form of
user@COMPANY.COM

This approach works well if only a limited amount of people need access to Kafka but has obvious shortcomings for large teams. For larger organizations or enterprises access control will usually be based on group memberships in a central Active Directory. There are a few good reasons for this:

  • if all members of a team need the same access to multiple systems all these systems can be configured to grant access to that team's group, so new team members need only be added to one group, instead of ACLs created in ten different systems
  • creating new resources to which the entire team needs access only requires one ACL to be created instead of one ACL for every user that needs access (and you always forget one user!)
  • for large organizations there will usually be one central department responsible for granting access to systems based on service requests in Service Manager or something similar - for them it is much easier to add someone to a group in AD instead of logging onto a Kafka node, kinit'ing with a brokers keytab and issuing kafka-acls --add --allow-principal new-user@COMPANY.COM --topic * -- ..... - especially if you consider that they'd have to know the equivalent process for probably a few dozen other systems as well

I think it is safe to say, that this is a feature that would be worth having, and there is also KAFKA-2794 for this, which hasn't seen much activity for a while though. So let's see how we can implement this on our own for now.

A word of caution before we start!
The code shown here and provided in the GitHub repository is really just a proof of concept to test whether this would work in a simple way. This should in no case be let near any production system without extensive testing and additional error and fringe case handling!

The following diagram shows the (somewhat simplified) data flow that happens internally when Kafka authorizes a request:

The architecture of the Kafka demo

A DefaultKafkaPrincipalBuilder gets passed the context of the request, which, depending on your chosen form of authentication will either be a Plaintext, SSL or SASL context object. The builder now creates a KafkaPrincipal object from this context which contains the type of principal and the extracted value. For the DefaultKafkaPrincipalBuilder this type will always be "User", the value will vary, as we have seen in the table above.

The SimpleAclAuthorizer is now passed this KafkaPrincipal along with the details of the request to be authorized. It has a buffered list of defined ACLs and uses these to evaluate whether or not to grant the request.

As I mentioned at the start, all of this is pluggable - the DefaultKafkaPrincipalBuilder implements the KafkaPrincipalBuilder interface and the SimpleAclAuthorizer the Authorizer interface. By doing the same we can build our own implementations for these classes and configure Kafka to use those instead of the defaults shown above.

Let's take a step back and look at what we need for our endeavor:

  1. look up AD groups for a user
  2. create & manage ACLs that include group membership
  3. add groups to the KafkaPrincipal
  4. apply ACLs

Active Directory Lookup

Item 1 seems like a large piece of work, but luckily someone else has already done all the heavy lifting here as part of the Apache Hadoop project. Hadoop-common contains the GroupMappingServiceProvider interface to allow creating custom classes which map users to groups. There are a couple of implementations in the project, two of which are of special interest to us:

While the first one sounds tempting for our use case the recommendation is to keep away from it for a variety of reasons. Mostly though, this class has to duplicate a lot of functionality that is better handled in SSSD (see below).

In our case there is an additional reason not to use LdapGroupsMapping though. Kafka does not currently offer a way to pass configuration options to a PrincipalBuilder, so we would need to hardcode all required settings like LDAP URL, user etc. in the source code.

For now we will stick with the shell-based mapper, which means that unix groups and AD groups will need to be synchronized with local Unix. There are quite a few tools out there to do that, SSSD and Centrify being two commonly used ones. Setting up this integration is beyond the scope of this post, but there are a lot of good resources on the web that explain what needs to be done.

Manage ACLs

In order to authorize against ACLs based on group information we need some way to create and manage these ACLs - and again I have good news, the existing ACL management tool already offers the functionality that we need.

A common command to create a user based access rule might look like this:

$ bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
   --allow-principal User:Bob --allow-host 198.51.100.0 \
   --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic

What happens when you run this command is, that an instance of SimpleAclAuthorizer is created and this is then used to store this ACL. In the case of SimpleAclAuthorizer this means that it ends up in ZooKeeper, which can easily be checked with the ZooKeeper shell:

$ zookeeper-shell 127.0.0.1:2181 <<< "get /kafka-acl/Topic/test"
Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"version":1,"acls":[{"principal":"User:Bob","permissionType":"Allow",
"operation":"Read","host":"198.51.100.0"},{"principal":"User:Bob",
"permissionType":"Allow","operation":"Read","host":"198.51.100.1"},
{"principal":"User:Bob","permissionType":"Allow","operation":"Write",
"host":"198.51.100.0"},{"principal":"User:Bob","permissionType":"Allow",
"operation":"Write","host":"198.51.100.1"}]}
cZxid = 0x2f
ctime = Fri Feb 23 16:07:58 CET 2018
mZxid = 0x2f
mtime = Fri Feb 23 16:07:58 CET 2018
pZxid = 0x2f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 388
numChildren = 0

As you can see, the ACL is stored in JSON representation and the principal we specified is stored as a string in that JSON object. When the Authorizer later retrieves this role it will split the principal at the colon, the first part becomes the principal type, the second part becomes the principal name. What this means for us is, that it is ok to simply pass in "Group:users" as a principal and this will later be available to us for authorization without having to write a single line of code.

Build a principal that includes group information

Alright, this is where the fun starts. So far, we have found that a good chunk of our work was already done by other people (which is nice, don't get me wrong!), but now we finally get to write some code!

We will need to implement the KafkaPrincipalBuilder interface and create a class that uses an instance of the Hadoop group mapper to look up groups for the authenticated user. These groups then need to be stored in the KafkaPrincipal that is used for authentication later on.

By default the KafkaPrincipal only allows to store two pieces of information:

  • type of the principal (i.e. user)
  • value (i.e. the username)

So if we use the stock KafkaPrincipal object we'd lose the username and only be able to store one group name. To get around this limitation we will extend KafkaPrincipal as well. By adding a List<KafkaPrincipal> property we are able to store multiple groups in addition to the username.

public class ComplexKafkaPrincipal extends KafkaPrincipal{

  protected List<KafkaPrincipal> additionalPrincipals = new ArrayList<>();
  public ComplexKafkaPrincipal(String principalType, String name) {
      super(principalType, name);
  }

  public ComplexKafkaPrincipal(KafkaPrincipal kafkaPrincipal) {
      this(kafkaPrincipal.getPrincipalType(), kafkaPrincipal.getName());
  }

  public ComplexKafkaPrincipal(String principalType, String name, List<KafkaPrincipal> additionalPrincipals) {
      this(principalType, name);
      this.additionalPrincipals = additionalPrincipals;
  }

  public List<KafkaPrincipal> getGroupMemberships() {
      return additionalPrincipals;
  }
}

To make use of this extended Principal we need a PrincipalBuilder that knows about this list and uses it to store information. The next listing shows the relevant part of this class. It is a bit larger than shown here with some boilerplate code added, but the main functionality that was discussed so far can be found in this snippet:

public class HadoopGroupMappingPrincipalBuilder implements KafkaPrincipalBuilder, Configurable {
  private GroupMappingServiceProvider groupMapper;
  private DefaultKafkaPrincipalBuilder principalBuilder;

  @Override
  public KafkaPrincipal build(AuthenticationContext context) {
    // Create a base principal by using the DefaultPrincipalBuilder
    ComplexKafkaPrincipal basePrincipal = new ComplexKafkaPrincipal(principalBuilder.build(context));

    // Resolve username based on what kind of AuthenticationContext the request has
    // and perform groups lookup
    if (context instanceof SaslAuthenticationContext) {
      basePrincipal.additionalPrincipals = getGroups(basePrincipal.getName());
    } else if (context instanceof SslAuthenticationContext) {
      basePrincipal.additionalPrincipals = getGroups(getUserFromCertificate(basePrincipal.getName()));
    }
    return basePrincipal;
  }

  private List<KafkaPrincipal> getGroups(String userName) {
    List<KafkaPrincipal> groupPrincipals = new ArrayList<>();
    try {
      // Add user principal to list as well to make later matching easier
      groupPrincipals.add(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName));

      principalLogger.fine("Resolving groups for user: " + userName);
      List<String> groups = groupMapper.getGroups(userName);
      principalLogger.fine("Got list of groups for user " + userName + ": " + Utils.join(groups, ", "));
      for (String group : groups) {
        groupPrincipals.add(new KafkaPrincipal("Group", group));
      }
    } catch (IOException e) {
      principalLogger.warning("Groups for user " + userName +
          " could not be resolved, proceeding with authorization based on username only.");
    }
    return groupPrincipals;
  }

}
As mentioned earlier the class also contains code that will only become useful after Kafka is extended to allow passing configuration parameters to a PrincipalBuilder implementation. I plan to raise a pull request to add functionality for this in Kafka. This would allow to use any implementation of Hadoop's GroupMappingServiceProvider to look up user groups.

I am aware, that this is not a feasible approach as it currently stands, since we are pulling in the entire hadoop-common library - which is not small - just to run a few shell commands. If this turns into a more serious effort to integrate the functionality in Kafka I'll look into refactoring this to only pull the relevant pieces in.

Build an authorizer that evaluates the information our PrincipalBuilder added

At this point the foundations are in place and all that remains is to implement the actual authorizer. Optimally I would have liked to extend the existing SimpleAclAuthorizer and override only a select few methods to maximise code reuse. However a large part of the functionality is scoped as private, so the class doesn't really lend itself well to subclassing.

So for now I have resorted to simply copying SimpleAclAuthorizer and making the necessary changes, the meat of which is shown in the following snippet:

private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal,
                       host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
  // Build a list of all Principals for this ComplexPrincipal
  var allPrincipals = List[KafkaPrincipal]()

  if (principal.isInstanceOf[ComplexKafkaPrincipal]) {
    // For
    allPrincipals = principal.asInstanceOf[ComplexKafkaPrincipal].getGroupMemberships.asInstanceOf[List[KafkaPrincipal]]
  } else {
    // A KafkaPrincipal was passed
    allPrincipals ::= new KafkaPrincipal(principal.getPrincipalType, principal.getName)
  }

  // Match principals against ACLs
  allPrincipals
    .map(p => singleAclMatch(operations, resource, p, host, permissionType, acls))
    .foldLeft(false)(_ || _)
}

What this does is extract the inner list of KafkaPrincipals from the ComplexKafkaPrincipal and match this list individually with the list of stored ACLs - first for ones denying the action, then for ones allowing the action. If any one principal from the list matches any ACL this is considered an overall match. So if "User:sliebau" is not allowed to write to the topic test, but the "Group:supergroup", which he is a member of, is allowed to write then his request will be granted.

Deploying

To deploy this authorizer we build a jar file from the project - all dependencies aside from hadoop-common are scoped as provided, since the code will be run within the Kafka JVM where everything that is needed should already be on the classpath. If this is deployed to a Kafka cluster running as part of the larger Hadoop stack you could also mark this library as provided and add the existing Hadoop libs to the classpath instead.

After creating the .jar file, upload it to all machines on your (security enabled) cluster and set the following options in your broker configuration:

authorizer.class.name=com.opencore.kafka.ComplexAclAuthorizer
principal.builder.class=com.opencore.kafka.HadoopGroupMappingPrincipalBuilder

and add the directory containing your jar to your CLASSPATH before starting your kafka brokers.

export CLASSPATH=/path/to/your/file.jar
./kafka-server-start.sh ../config/kafka.properties

Let's look at an example of what effect this has on authorization. I have created two topics test and test2 and set permissions for the user sliebau on test and a group that he is member of on test2:

./kafka-acls.sh --list --authorizer-properties zookeeper.connect=10.0.0.5:2181/kafka_1.0
Current ACLs for resource `Topic:test`: 
    User:sliebau@OPENCORE.COM has Allow permission for operations: Write from hosts: *
    User:sliebau@OPENCORE.COM has Allow permission for operations: Describe from hosts: * 

Current ACLs for resource `Topic:test2`: 
    group:supergroup has Allow permission for operations: Describe from hosts: *
    group:supergroup has Allow permission for operations: Write from hosts: * 

Current ACLs for resource `Cluster:kafka-cluster`: 
    group:supergroup has Allow permission for operations: Create from hosts: *
    User:sliebau@OPENCORE.COM has Allow permission for operations: Create from hosts: *

I now authenticate as sliebau and try to produce to both topics in turn. Authentication in this context can be either SASL or SSL based. For SSL the username is extracted from the Common Name of the certificate presented by the client. For SASL it depends on the actual implementation, but in principle the default behavior of Kafka is used.

The authorizer log shows that both operations are allowed, however based on different ACLs:

# kafka-console-producer.sh --topic test --...
[2018-02-20 17:35:54,999] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Describe from 
host = 10.0.0.9 on resource = Topic:test (kafka.authorizer.logger)
[2018-02-20 17:35:56,213] DEBUG operation = Write on resource = Topic:test from 
host = 10.0.0.9 is Allow based on acl = User:sliebau@OPENCORE.COM has Allow permission for operations: Write from 
hosts: * (kafka.authorizer.logger)
[2018-02-20 17:35:56,213] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Write from 
host = 10.0.0.9 on resource = Topic:test (kafka.authorizer.logger)

# kafka-console-producer.sh --topic test2 --...
[2018-02-20 17:36:11,388] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Describe from 
host = 10.0.0.9 on resource = Topic:test2 (kafka.authorizer.logger)
[2018-02-20 17:36:12,457] DEBUG operation = Write on resource = Topic:test2 from 
host = 10.0.0.9 is Allow based on acl = Group:supergroup has Allow permission for operations: Write from 
hosts: * (kafka.authorizer.logger)
[2018-02-20 17:36:12,457] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Write from 
host = 10.0.0.9 on resource = Topic:test2 (kafka.authorizer.logger)

So there you have it, group based ACLs in Kafka with minimal effort.

The code

If you want to play around with this yourself, the full code can be found on GitHub. All code in this repository is based on Kafka 1.0.0 and won't work with earlier versions without modifications. The entire API in this area of the code was changed heavily in KIP-189.

The PrincipalBuilder would, in theory, allow using any implementation of Hadoop's GroupMappingServiceProvider by configuring this in the Kafka broker config. As already mentioned configuring this is not currently possible. So this class is currently more of an academic exercise that may or may not become useful at some point.

I have built and released a version 0.1 of this code, which you should be able to stick in your Kafka class path and get going right away, if you so choose. But please be aware of my advice on using this in production above!!

You can download the compiled jar file here.


If you enjoy working on new technologies, traveling, consulting clients, writing software or documentation please reach out to us. We're always looking for new colleagues!