Spark-On-HBase in Cluster Mode with Secure HBase

written by Lars George on 2016-03-18

Running MapReduce or Spark jobs on YARN that process data in HBase is easy… or so they said until someone added Kerberos to the mix!

There are many processing engines in Hadoop, some of which orchestrate their work on their own. The majority though rely on a central resource manager and scheduler that allows to mix workloads and engines, all the while ensuring the various processes receive their fair share of available resources. For Hadoop, this is usually provided by YARN, the successor of the monolithic MapReduce framework that existed before Hadoop 2.0. With YARN it is now possible to run any compliant application in a managed and controlled way, no matter whether it performs complex machine learning algorithm computations, analytical SQL queries, or mundane ETL workloads.

Speaking of machine learning, the weapon of choice today is Apache Spark, which provides a flexible API that can serve as the foundation for a wide range of tasks, such as building recommendation engine models, clustering entities, fraud detection, and much more. Spark has its origins in a different cluster-wide resource management framework, namely Apache Mesos. In fact, Spark was one of the initial applications developed for Mesos a few years ago but has since then been extended to support other frameworks. To that extent, Spark has three execution modes:

  • Local
  • Client
  • Cluster

The local mode is for testing on a single development machine, the difference between client and cluster is how job control is handled: while in client mode you can interact with the worker processes, in cluster mode you forfeit this control and let the job run its way remotely after sending it to the Resource Manager. See this blog post for more details. For the latter two there are further choices, representing the actually supported execution frameworks:

  • Standalone
  • Mesos
  • YARN

Spark has been developed to be fairly agnostic regarding the framework it runs on, however for the remainder of this post we will focus on YARN as the execution platform.

Hadoop on its own is a very trusting system and allows pretty much anybody access to all data and even anybody to impersonate anybody else without any checks, making it every auditors worst nightmare. So let us look at adding security to the mix...

State-of-the-art for perimeter security through authentication of clients and servers in Hadoop is Kerberos. Microsoft Active Directory Server and MIT Kerberos are common choices that are often used in combination with Hadoop. The basic idea behind Kerberos is that there is a central, trusted service that allows parties to authenticate themselves by first contacting this authority: the KDC (Kerberos Key Distribution Centre). Interactive clients usually use a password prompt maybe with an additional two-factor authentication mechanism such as a number generating fob, or a fingerprint reader. After successful authentication the KDC issues you a ticket that you can use to contact participating servers through, for example, RPC calls. You need to send in your ticket with each request to ensure it is really you calling in.

Note: There is a complete overload of terms like Driver and Client in YARN and Spark. The former talks about a driver (and has a Driver class to go along) as something that bootstraps a job, which is then run on the cluster. This is accomplished through the YARN client. For Spark though (again most likely because of its Mesos, or rather non-YARN heritage) there is a Client class that bootstraps a job, and is launching a driver program, which then orchestrate the actual worker processes. Confused? Yep.

When a Hadoop cluster with Kerberos enabled is receiving a client request, it validates the ticket and performs the action you are asking for. This is true for YARN job submissions too, where the client driver application connects via the API of the ResourceManager (RM) and is handing in the job details. The RM adds the job to the work queue and handles its fate from there on. Now assume you have written a MapReduce program that is executed and is reading data from HBase. Remember that ticket you need to call any secured RPC endpoint? Same thing here, but since job execution is scheduled asynchronously, you have no ticket anymore. Or rather, when the RM is starting the ApplicationMaster (AM) on a physical server, by means of a the NodeManager (NM) running there, it has long stopped talking to you and has no recollection of your token.

How can you still access HBase? In general, how can servers that are not interactively calling RPC endpoints still obtain a ticket? This is facilitated using a password stored in a file. This file is usually called the keytab file. That file is handed into the server processes by means of a configuration option, and loaded at start time. After that, each server does the same regarding the KDC and ticket generation. This means, one way of enabling a job to talk to a secure HBase setup is to (foolishly) send the keytab with the job using the Distributed Cache provided by YARN. The issue is that you are sending a wildcard access token to a remote server and have to trust its security configuration, so that no one can get access to that file.

Sending a keytab is just bad form, as it is not a safe choice, and in addition would allow the recipient to perform any operation the principal is allowed to invoke. Hadoop’s file system HDFS noticed that early on and switched to a delegation token concept. These tokens are issued by the NameNode (NN) and can be used by clients to access its services safely - but only HDFS related operations. Note that delegation tokens have nothing to do with Kerberos, they are a custom Hadoop implementation and use symmetric encryption with an in-memory, server-side secret to ensure the token is indeed one that was issued by the server process, and is still valid. That is, tokens have an expiration time, which ensures they go stale after a configured amount of time (explained for HBase below).

HBase did add the same mechanism sometime in 2011, and allows for clients to retrieve a delegation token with which they can communicate with a server, not needing a Kerberos ticket anymore. This is where the loop closes, as in, to solve the above issue of scheduled jobs having no ticket, but requiring to talk to servers via RPC, we need to use delegation tokens. In the driver class that submits the job we can retrieve a token using the HBase client classes, which is subsequently encoded into a string:

Connection connection = ConnectionFactory.createConnection(conf);
Token<AuthenticationTokenIdentifier> token = 
  TokenUtil.obtainToken(connection);
String urlString = token.encodeToUrlString();
File temp = new File(FileUtils.getTempDirectory(), "token");
FileUtils.writeStringToFile(temp, urlString);
job.addCacheFile(new URI(temp.getPath() + "#token"));
...

Then inside the class we run to execute the job server-side we can recreate the token, by parsing the string, and then adding it into the global instance of the user:

String strToken = FileUtils.readFileToString(new File("token"));
Token token = new Token();
token.decodeFromUrlString(strToken);
UserGroupInformation.getCurrentUser().addToken(token);

One thing missing so far is how the serialized token is transported to the workers executing the job class. There are few options, including the job configuration. You could add the token (in its serialized text form) with a custom key, and read it later on. That though will cause for the token to appear in logs and/or the job configuration details, potentially visible to many cluster users. Better here is again the Distributed Cache, meaning you save the token to a local text file, attach it as a file to the job, which then is available inside the job tasks. Even if someone gets access to that file, which would be as easy or difficult as with the above naive attachment of the keytab (don’t!), they would only be able to use that token for a limited amount of time, and not be able to create new tickets for ever.

And since we mentioned the lifetime limitation of tokens twice now (for HDFS earlier and now HBase), this is set using the following keys, shown in the context of the code reading it in RpcServer.java:

long keyUpdateInterval = conf.getLong("hbase.auth.key.update.interval", 
  24*60*60*1000);
long maxAge = conf.getLong("hbase.auth.token.max.lifetime", 
  7*24*60*60*1000);
return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
  server.getServerName().toString(), keyUpdateInterval, maxAge);

What happens here is that every server in the HBase cluster is setting up such a secret manager. It is used to create new delegation tokens with a specific, though also lifetime restricted, key. But how can a distributed system agree on only one currently active key so that they can encrypt and decrypt the tokens between them subsequently? Well, that is where once more ZooKeeper comes in. Each secret manager partakes in a leader election, just like the HBase Masters do, finding consensus and agreeing on only one of them being active. Once that is complete, the active manager will generate the first key and store it in the - of course protected by Kerberos - dedicated znode. Every 24 hours by default the key is rolled and a new created, which is then also stored in ZooKeeper. The manager cleans up the oldest keys after the above lifetime period for the tokens they may have been used for expires. That lifetime is by default seven days, so while a token is usable for seven days, so are its corresponding keys kept in ZooKeeper.

The above is so far the same for both YARN and Spark jobs accessing HDFS or HBase, but for one difference concerning the execution modes of the latter. In Spark client mode, you have your driver - which is in this case located inside your client application - stay in contact with the cluster, which runs the actual tasks. You are authenticated using your ticket to communicate with the servers. And what is even better, with Spark 1.4 there is an automated process that, given you have the HBase libraries and a hbase-site.xml on your class path, it will add a HBase delegation token (among others) to the Spark context and ensure the remote workers can talk to the equally remote HBase nodes. In Spark 1.6 there is also now an option to turn off that feature, if needed. What Spark does it what we already have seen earlier, it uses the Distributed Cache of YARN to supply the retrieved token to the workers, which read it implicitly without any additional work inside your code. Sweet!

What now about the cluster mode? In that case the Spark driver is located in the ApplicationMaster on the remote server, with now interactive connection from the client, and therefore no valid Kerberos ticket. Here the Spark client helps again, adding the token in the same way, but through one more hoop. Here you often see the use the SparkLauncher class, which wraps the Spark command line into a Java process (but requiring Scala and Spark being installed on the launcher machine, yuk!). Inside the actual Spark Client class bootstraps the job, so you must pass on the HBase libraries and hbase-site.xml file and ensure they are added to the class path built for the Spark launcher environment. After that the rest will work as prescribed… easy, eh?

If you enjoy working on new technologies, traveling, consulting clients, writing software or documentation please reach out to us. We're always looking for new colleagues!