About This Blog

The OpenCore blog accompanies the main site, providing a pulse of what is happening inside the company, what we are thinking (if it is worthwhile saying), and general useful information about technology.

Spark 2.0 streaming from SSL Kafka with HDP 2.4

written by Oliver Meyn (Guest blog) on 2017-02-05

Oliver Meyn is located in Toronto, Canada and has worked with the Hadoop ecosystem since 2009. He currently works as a freelance Hadoop & Big Data consultant in Canada. He originally published this post on his own blog but agreed to repost it here. Thank you very much Oliver!


Hortonworks Data Flow (HDF) bundles Apache NiFi, Apache Kafka, and Apache Storm. Together these make a powerful combination for transporting and transforming data in near-real time. With the addition of Apache Spark for machine learning and model evaluation, Apache Hadoop for storage, and Apache Hive for later analytics querying we have the complete chain for processing data from the Internet of Things (IoT). At a recent customer (in my work for T4G) we were asked to build this workflow, but with the strong requirement that all Kafka communication be encrypted via SSL/TLS, and that all parts of the architecture do secure authentication via Kerberos. These requirements turned out to cause some significant challenges and this blog post explains how we overcame them. Note that all the code referenced later is available on github.

The Architecture

The flow of data would be as in the following diagram - originating from television set-top boxes, transiting a NiFi instance outside the cluster firewall, through the SSL Kafka, picked up by Spark running inside the cluster (on YARN) for ML evaluation, and a second NiFi for writing directly to HDFS.

Generic Streaming Architecture

In this case the cluster version is HDP 2.4.2, which natively provides Spark version 1.6. There is an MIT Kerberos daemon (kdc) inside the cluster, accessible through the firewall. NiFi is a single instance of version 1.0 (from HDF 2.0), running outside the cluster. Kafka is a single 0.10 instance (from HDF), providing SSL connections through a self-signed certificate, and running inside the cluster.

NiFi Configuration

NiFi allows the connection of various "processors" into any number of workflows through a very user-friendly GUI. It originated as a product of the NSA, with a small subset of their processors included in the initial Apache release. Since it was open sourced many more processors have been added, but not all of them place security as the highest priority. In this case the PublishKafka_0_10 processor behaves very well and configuring the SSL context within NiFi and adding the requisite jaas.conf file with our Kerberos principal was straightforward (reasonably well described in the NiFi documentation). Make sure you're using the SASL_SSL Security Protocol. Once you have the PublishKafka processor working you can easily test connectivity by adding a ConsumeKafka_0_10 processor that writes to local files. A good idea is also to set up a simple workflow that can put test messages onto your Kafka topic of choice for later testing with the Spark consumer.

Spark Time!

In HDP 2.4.2 the shipped version of Spark is 1.6, and Spark 1.6 can not speak SSL to Kafka. Full stop. This is because the Kafka client libraries changed between version 0.8 and 0.10, and SSL is only available as of 0.10 (as described by DataBricks). For further proof, the JIRA that states it won't work is SPARK-12177 (note that Spark versioning moves from 1.6 to 2.0 - there's nothing in between). Sooooo.....

Spark time, for real this time!

Because we're running Spark on YARN, the application is self contained and in principle it should be easy (hah!) to package it up such that the cluster is none the wiser. Some differences between Spark 1.6 and 2.0 before we go further:

Spark Version 1.6 2.0
Scala Version 2.10 2.11
Uses Assembly? Y N
SSL Support? N Y

Hortonworks has a blog post that shows how to do it on HDP 2.5 - Spark 2 is officially supported by Hortonworks on HDP 2.5 but not on HDP 2.4. Note that in Spark versions less than 2 it used an assembly jar that held all the needed Spark libraries, and this was often hard to configure when building by hand. Starting with Spark 2 the libraries are loaded as needed and there is no more assembly. The steps for installing on 2.4 are basically the same as for 2.5:

  1. Download the Spark libraries from http://spark.apache.org/downloads.html, pre-built for Hadoop 2.7. For this project we used v2.0.2. We recommend using the Scala 2.11 package (the default) for ease of compatibility with other libraries.
  2. Unzip them into /usr/hdp/current/spark2-client on the machine where you start Spark jobs (typically an Edge node).
  3. The user who will run the spark jobs needs $ export SPARK_HOME=/usr/hdp/current/spark2-client.
  4. Presuming you've got Spark 1.6 working fine already, copy its spark-defaults.conf to the new spark2-client directory (e.g. $ cp /usr/hdp/current/spark-client/conf/spark-defaults.conf /usr/hdp/current/spark2-client/conf/).
  5. Add/edit the following lines to the spark2-client's spark-defaults.conf:
    spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
    spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
    spark.driver.extraJavaOptions -Dhdp.version=2.4.2.0-258
    spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.2.0-258
    spark.hadoop.yarn.timeline-service.enabled false
    

Note the hdp.version parameters - it is critical that these match your actual cluster version. YARN uses them to build the classpath of its jobs, and HDP ships with some jars (notably lzo) that have the version number in their names, so if it's wrong the classpath breaks and you'll get error messages like Can't find ApplicationLauncher. Similarly disabling the timeline service is a consequence of some Jersey classes missing in the newer Spark.

This is enough to get Spark 2 running in HDP 2.4, and you can test by calculating Pi:

$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples*.jar \
10

Note you'll have to kinit before submitting the job, but that should be enough to get you a few digits of pi. Since our goal is to speak SSL to Kafka from a Spark streaming app, we're not done yet. We need to tell Spark how to authenticate itself to Kafka (via Kerberos) and to trust the self-signed SSL certificate that Kafka presents.

Adding Kerberos

We need to provide a keytab for our Kerberos principal, and instructions how to use the keytab in the form of a jaas.conf file (JAAS = Java Authentication and Authorization Service). Note that in these examples we use the principal gandalf@DOMAIN.COM - substitute your principal as needed.

jaas.conf:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./gandalf.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="gandalf@DOMAIN.COM";
};

With the jaas.conf and a keytab handy in the directory from which you'll launch your job, launch as follows:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Note the hdp.version showing up again: by passing the --driver-java-options parameter we're overriding the parameter given in spark-defaults.conf so if we don't provide the hdp.version here as well as our security option the classpath will again break.

Adding SSL

Almost there - now we need to trust Kafka. If you have a CA signed certificate in Kafka then you can technically skip this step, but it won't hurt. Java ships with a store of certificates that it implicitly trusts - these are the signing authorities of the Web. Here we take a copy of that store and add the certificate from Kafka to it. Be careful with language here - Java only talks about keystores, but NiFi and Kafka refer to both keystores and truststores. They refer to keystores as those used by servers that provide a secure connection (e.g. Kafka in our case) and the keystore holds the certificates and private keys for that server. That is different from truststores, which are the certificates that a client (e.g. Spark in our case) trusts implicitly when opening connections to servers. This is the same process as when using a web browser to connect to a secure web server - when you ignore the warnings that you're going to an insecure site (say if the certificate has expired, or the names don't match) and check the "Always ignore this warning" box, you're adding that certificate to your truststore.

This may require root privileges (and note that the default password for the Java cacerts store is changeit):

cp $JAVA_HOME/jre/lib/security/cacerts .
keytool -storepasswd -new change-me-to-something-safe -keystore cacerts
openssl s_client -connect your.broker.fqdn:9092 -showcerts > kafka-certs.txt
keytool -import -alias kafka1 -file kafka-certs.txt -keystore cacerts -storepass change-me-to-something-safe
rm -f kafka-certs.txt
mv cacerts kafka.client.truststore.jks

Now you have a truststore called kafka.client.truststore.jks and can pass that in when you start your Spark job:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--files jaas.conf,gandalf.keytab,kafka.client.truststore.jks \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dhdp.version=2.4.2.0-258" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--class com.example.ClassName uber-jar-with-deps-and-hive-site.jar

Adding SparkSQL Support (hive-site.xml)

In our case we needed SparkSQL support (for working with DataFrames) and that means bundling your hive-site.xml in the submitted Spark app. Taking a copy of the hive-site.xml from your cluster is fine, with one important change - make sure the hive.execution.engine is not tez, but mr:

<property>
  <name>hive.execution.engine</name>
  <value>mr</value>
</property>

Spark won't use MapReduce for anything but when it tries to load the metastore it will get confused (read: everything breaks) if it's not 'mr'.

Code

Finally we have all the pieces connected, now we can write some code that actually reads messages from Kafka and does something useful. We'll leave the something useful to you, but here's some skeleton code that makes the connection (this code is also available on github):

import grizzled.slf4j.Logger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SimplestStreaming {
  val logger = Logger[this.type]

def main(args: Array[String]): Unit = {
  val spark = SparkSession
    .builder()
    .appName("Simplest streaming (spark 2.0) from Kafka SSL")
    .enableHiveSupport()
    .getOrCreate()
  val sparkContext = spark.sparkContext

  val streamingContext = new StreamingContext(sparkContext, Seconds(10))
  // expects jaas.conf, appropriate keytab, and kafka.client.truststore.jks passed in as part of spark-submit
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> ":",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "test1",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean),
    "security.protocol" -> "SASL_SSL",
    "ssl.truststore.location" -> "./kafka.client.truststore.jks",
    "ssl.truststore.password" -> "change-me-to-something-safe"
  )
  val topic = Set("sasl_ssl_test")

  val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topic, kafkaParams)
  )

  stream.foreachRDD { rdd =>
    // Get the singleton instance of SparkSession
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
    import spark.implicits._

    val df = rdd.map( consumerRecord => {
      consumerRecord.value()
    }).toDF()

    df.show()
  }

  // start the computation
  streamingContext.start()
  streamingContext.awaitTermination()
  }
}

Maven

We use Maven to build the project and included in the code is the pom.xml that will let you build this class. Use $ mvn clean package and your uber jar (with all dependencies except Spark included) will be in the target/ dir.

Conclusion

There is a large dent in the wall where we banged our heads to get this all working, but hopefully we've now saved you the same fate. Good luck!