In this blog post I will explain how custom authorization modules can be developed for Kafka without having to recompile Kafka itself. This will be demonstrated with the example of authorization based on Active Directory groups.
All code shown in this post is also available on GitHub. A compiled version for Kafka 1.0.0 is available here.
Table of contents
Security in Kafka has come a long way since the early days when the only option of securing a Kafka cluster was with a firewall. Current versions support SSL and SASL for authentication, the upcoming version 1.1 will also allow obtaining delegation tokens. Especially the latter feature will make life a lot easier for people using Spark Streaming with Kafka. But this will be the topic of a future blog post - until then if you are interested in the reasoning behind delegation tokens I encourage you to read this article.
Authentication is only half the equation of course. After Kafka knows who you are it needs to figure out what you are allowed to do and see. To make authorization as flexible as possible a model allowing the creation of custom authorizers was chosen. These custom authorizers can then be defined in the configuration and are dynamically loaded at runtime without changing the Kafka code at all.
So if you need to grant access based on moon phases, what you'd do is implement a MoonPhaseAuthorizer
that roughly looks like this:
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { moon.isFullMoon(System.currentTimeGetMillis()) }
Then you'd put this in the classpath of the Kafka brokers, configure the broker setting authorizer.class.name and relax in the knowledge that your data is safe.
Not everybody has very specific security needs like these though, so in order to serve the default use case Kafka comes with a reference implementation: SimpleAclAuthorizer
. This enables basic authorization (more details follow further down). There are of course no definitive numbers, but in my experience 99% of people using Kafka with security enabled are happily using the SimpleAclAuthorizer. And when I say happily I mean that there are of course gripes about missing functionality, but overall it works and there is no need to go down the route of rolling your own.
The complaints I hear most often are:
It is that last point that we will pick up today to demonstrate implementing a custom authorizer class that retrieves a user's groups from Active Directory and allows creating ACLs that use these.
The following will require a basic understanding of Kafka ACLs and how to manage them, so at this point I would recommend at least skimming the official or Confluent documentation if you are not familiar with this topic.
For the SimpleAclAuthorizer the only principal that can be used in ACLs is the username. The format of this differs, depending on how the user authenticated himself when connecting to Kafka:
Authentication method |
User Principal |
---|---|
PLAINTEXT | ANONYMOUS |
SSL | Information is taken from the certificate the client presented: CN=username;OU=...;DN=... |
SASL | Depends on the SASL mechanism used, for Kerberos it would usually be a user principal in the form of user@COMPANY.COM |
This approach works well if only a limited amount of people need access to Kafka but has obvious shortcomings for large teams. For larger organizations or enterprises access control will usually be based on group memberships in a central Active Directory. There are a few good reasons for this:
kafka-acls --add --allow-principal new-user@COMPANY.COM --topic * -- .....
- especially if you consider that they'd have to know the equivalent process for probably a few dozen other systems as wellI think it is safe to say, that this is a feature that would be worth having, and there is also KAFKA-2794 for this, which hasn't seen much activity for a while though. So let's see how we can implement this on our own for now.
The following diagram shows the (somewhat simplified) data flow that happens internally when Kafka authorizes a request:
A DefaultKafkaPrincipalBuilder
gets passed the context of the request, which, depending on your chosen form of authentication will either be a Plaintext, SSL or SASL context object. The builder now creates a KafkaPrincipal
object from this context which contains the type of principal and the extracted value. For the DefaultKafkaPrincipalBuilder
this type will always be "User", the value will vary, as we have seen in the table above.
The SimpleAclAuthorizer
is now passed this KafkaPrincipal
along with the details of the request to be authorized. It has a buffered list of defined ACLs and uses these to evaluate whether or not to grant the request.
As I mentioned at the start, all of this is pluggable - the DefaultKafkaPrincipalBuilder
implements the KafkaPrincipalBuilder interface and the SimpleAclAuthorizer
the Authorizer
interface. By doing the same we can build our own implementations for these classes and configure Kafka to use those instead of the defaults shown above.
Let's take a step back and look at what we need for our endeavor:
Item 1 seems like a large piece of work, but luckily someone else has already done all the heavy lifting here as part of the Apache Hadoop project. Hadoop-common contains the GroupMappingServiceProvider interface to allow creating custom classes which map users to groups. There are a couple of implementations in the project, two of which are of special interest to us:
While the first one sounds tempting for our use case the recommendation is to keep away from it for a variety of reasons. Mostly though, this class has to duplicate a lot of functionality that is better handled in SSSD (see below).
In our case there is an additional reason not to use LdapGroupsMapping
though. Kafka does not currently offer a way to pass configuration options to a PrincipalBuilder
, so we would need to hardcode all required settings like LDAP URL, user etc. in the source code.
For now we will stick with the shell-based mapper, which means that unix groups and AD groups will need to be synchronized with local Unix. There are quite a few tools out there to do that, SSSD and Centrify being two commonly used ones. Setting up this integration is beyond the scope of this post, but there are a lot of good resources on the web that explain what needs to be done.
In order to authorize against ACLs based on group information we need some way to create and manage these ACLs - and again I have good news, the existing ACL management tool already offers the functionality that we need.
A common command to create a user based access rule might look like this:
$ bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:Bob --allow-host 198.51.100.0 \ --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
What happens when you run this command is, that an instance of SimpleAclAuthorizer
is created and this is then used to store this ACL. In the case of SimpleAclAuthorizer
this means that it ends up in ZooKeeper, which can easily be checked with the ZooKeeper shell:
$ zookeeper-shell 127.0.0.1:2181 <<< "get /kafka-acl/Topic/test" Connecting to 127.0.0.1:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null {"version":1,"acls":[{"principal":"User:Bob","permissionType":"Allow", "operation":"Read","host":"198.51.100.0"},{"principal":"User:Bob", "permissionType":"Allow","operation":"Read","host":"198.51.100.1"}, {"principal":"User:Bob","permissionType":"Allow","operation":"Write", "host":"198.51.100.0"},{"principal":"User:Bob","permissionType":"Allow", "operation":"Write","host":"198.51.100.1"}]} cZxid = 0x2f ctime = Fri Feb 23 16:07:58 CET 2018 mZxid = 0x2f mtime = Fri Feb 23 16:07:58 CET 2018 pZxid = 0x2f cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 388 numChildren = 0
As you can see, the ACL is stored in JSON representation and the principal we specified is stored as a string in that JSON object. When the Authorizer later retrieves this role it will split the principal at the colon, the first part becomes the principal type, the second part becomes the principal name. What this means for us is, that it is ok to simply pass in "Group:users" as a principal and this will later be available to us for authorization without having to write a single line of code.
Alright, this is where the fun starts. So far, we have found that a good chunk of our work was already done by other people (which is nice, don't get me wrong!), but now we finally get to write some code!
We will need to implement the KafkaPrincipalBuilder
interface and create a class that uses an instance of the Hadoop group mapper to look up groups for the authenticated user. These groups then need to be stored in the KafkaPrincipal
that is used for authentication later on.
By default the KafkaPrincipal
only allows to store two pieces of information:
So if we use the stock KafkaPrincipal object we'd lose the username and only be able to store one group name. To get around this limitation we will extend KafkaPrincipal as well. By adding a List<KafkaPrincipal>
property we are able to store multiple groups in addition to the username.
public class ComplexKafkaPrincipal extends KafkaPrincipal{ protected List<KafkaPrincipal> additionalPrincipals = new ArrayList<>(); public ComplexKafkaPrincipal(String principalType, String name) { super(principalType, name); } public ComplexKafkaPrincipal(KafkaPrincipal kafkaPrincipal) { this(kafkaPrincipal.getPrincipalType(), kafkaPrincipal.getName()); } public ComplexKafkaPrincipal(String principalType, String name, List<KafkaPrincipal> additionalPrincipals) { this(principalType, name); this.additionalPrincipals = additionalPrincipals; } public List<KafkaPrincipal> getGroupMemberships() { return additionalPrincipals; } }
To make use of this extended Principal we need a PrincipalBuilder that knows about this list and uses it to store information. The next listing shows the relevant part of this class. It is a bit larger than shown here with some boilerplate code added, but the main functionality that was discussed so far can be found in this snippet:
public class HadoopGroupMappingPrincipalBuilder implements KafkaPrincipalBuilder, Configurable { private GroupMappingServiceProvider groupMapper; private DefaultKafkaPrincipalBuilder principalBuilder; @Override public KafkaPrincipal build(AuthenticationContext context) { // Create a base principal by using the DefaultPrincipalBuilder ComplexKafkaPrincipal basePrincipal = new ComplexKafkaPrincipal(principalBuilder.build(context)); // Resolve username based on what kind of AuthenticationContext the request has // and perform groups lookup if (context instanceof SaslAuthenticationContext) { basePrincipal.additionalPrincipals = getGroups(basePrincipal.getName()); } else if (context instanceof SslAuthenticationContext) { basePrincipal.additionalPrincipals = getGroups(getUserFromCertificate(basePrincipal.getName())); } return basePrincipal; } private List<KafkaPrincipal> getGroups(String userName) { List<KafkaPrincipal> groupPrincipals = new ArrayList<>(); try { // Add user principal to list as well to make later matching easier groupPrincipals.add(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, userName)); principalLogger.fine("Resolving groups for user: " + userName); List<String> groups = groupMapper.getGroups(userName); principalLogger.fine("Got list of groups for user " + userName + ": " + Utils.join(groups, ", ")); for (String group : groups) { groupPrincipals.add(new KafkaPrincipal("Group", group)); } } catch (IOException e) { principalLogger.warning("Groups for user " + userName + " could not be resolved, proceeding with authorization based on username only."); } return groupPrincipals; } }
I am aware, that this is not a feasible approach as it currently stands, since we are pulling in the entire hadoop-common library - which is not small - just to run a few shell commands. If this turns into a more serious effort to integrate the functionality in Kafka I'll look into refactoring this to only pull the relevant pieces in.
At this point the foundations are in place and all that remains is to implement the actual authorizer. Optimally I would have liked to extend the existing SimpleAclAuthorizer and override only a select few methods to maximise code reuse. However a large part of the functionality is scoped as private, so the class doesn't really lend itself well to subclassing.
So for now I have resorted to simply copying SimpleAclAuthorizer
and making the necessary changes, the meat of which is shown in the following snippet:
private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = { // Build a list of all Principals for this ComplexPrincipal var allPrincipals = List[KafkaPrincipal]() if (principal.isInstanceOf[ComplexKafkaPrincipal]) { // For allPrincipals = principal.asInstanceOf[ComplexKafkaPrincipal].getGroupMemberships.asInstanceOf[List[KafkaPrincipal]] } else { // A KafkaPrincipal was passed allPrincipals ::= new KafkaPrincipal(principal.getPrincipalType, principal.getName) } // Match principals against ACLs allPrincipals .map(p => singleAclMatch(operations, resource, p, host, permissionType, acls)) .foldLeft(false)(_ || _) }
What this does is extract the inner list of KafkaPrincipals
from the ComplexKafkaPrincipal
and match this list individually with the list of stored ACLs - first for ones denying the action, then for ones allowing the action. If any one principal from the list matches any ACL this is considered an overall match. So if "User:sliebau" is not allowed to write to the topic test, but the "Group:supergroup", which he is a member of, is allowed to write then his request will be granted.
To deploy this authorizer we build a jar file from the project - all dependencies aside from hadoop-common are scoped as provided, since the code will be run within the Kafka JVM where everything that is needed should already be on the classpath. If this is deployed to a Kafka cluster running as part of the larger Hadoop stack you could also mark this library as provided and add the existing Hadoop libs to the classpath instead.
After creating the .jar file, upload it to all machines on your (security enabled) cluster and set the following options in your broker configuration:
authorizer.class.name=com.opencore.kafka.ComplexAclAuthorizer principal.builder.class=com.opencore.kafka.HadoopGroupMappingPrincipalBuilder
and add the directory containing your jar to your CLASSPATH before starting your kafka brokers.
export CLASSPATH=/path/to/your/file.jar ./kafka-server-start.sh ../config/kafka.properties
Let's look at an example of what effect this has on authorization. I have created two topics test and test2 and set permissions for the user sliebau on test and a group that he is member of on test2:
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=10.0.0.5:2181/kafka_1.0 Current ACLs for resource `Topic:test`: User:sliebau@OPENCORE.COM has Allow permission for operations: Write from hosts: * User:sliebau@OPENCORE.COM has Allow permission for operations: Describe from hosts: * Current ACLs for resource `Topic:test2`: group:supergroup has Allow permission for operations: Describe from hosts: * group:supergroup has Allow permission for operations: Write from hosts: * Current ACLs for resource `Cluster:kafka-cluster`: group:supergroup has Allow permission for operations: Create from hosts: * User:sliebau@OPENCORE.COM has Allow permission for operations: Create from hosts: *
I now authenticate as sliebau and try to produce to both topics in turn. Authentication in this context can be either SASL or SSL based. For SSL the username is extracted from the Common Name of the certificate presented by the client. For SASL it depends on the actual implementation, but in principle the default behavior of Kafka is used.
The authorizer log shows that both operations are allowed, however based on different ACLs:
# kafka-console-producer.sh --topic test --... [2018-02-20 17:35:54,999] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Describe from host = 10.0.0.9 on resource = Topic:test (kafka.authorizer.logger) [2018-02-20 17:35:56,213] DEBUG operation = Write on resource = Topic:test from host = 10.0.0.9 is Allow based on acl = User:sliebau@OPENCORE.COM has Allow permission for operations: Write from hosts: * (kafka.authorizer.logger) [2018-02-20 17:35:56,213] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Write from host = 10.0.0.9 on resource = Topic:test (kafka.authorizer.logger) # kafka-console-producer.sh --topic test2 --... [2018-02-20 17:36:11,388] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Describe from host = 10.0.0.9 on resource = Topic:test2 (kafka.authorizer.logger) [2018-02-20 17:36:12,457] DEBUG operation = Write on resource = Topic:test2 from host = 10.0.0.9 is Allow based on acl = Group:supergroup has Allow permission for operations: Write from hosts: * (kafka.authorizer.logger) [2018-02-20 17:36:12,457] DEBUG Principal = User:sliebau@OPENCORE.COM is Allowed Operation = Write from host = 10.0.0.9 on resource = Topic:test2 (kafka.authorizer.logger)
So there you have it, group based ACLs in Kafka with minimal effort.
If you want to play around with this yourself, the full code can be found on GitHub. All code in this repository is based on Kafka 1.0.0 and won't work with earlier versions without modifications. The entire API in this area of the code was changed heavily in KIP-189.
The PrincipalBuilder
would, in theory, allow using any implementation of Hadoop's GroupMappingServiceProvider
by configuring this in the Kafka broker config. As already mentioned configuring this is not currently possible. So this class is currently more of an academic exercise that may or may not become useful at some point.
I have built and released a version 0.1 of this code, which you should be able to stick in your Kafka class path and get going right away, if you so choose. But please be aware of my advice on using this in production above!!
You can download the compiled jar file here.