Tupl Wins the MAPFRE Award for Best Agricultural Operation Using Technology at Expo Agritech 2024
2024-12-10 · Press Releases
About the Author:
Joaquín Terrasa is a passionate engineer who enjoys tinkering with data technologies and AI in his free time. An advocate of open-source technologies, he also collaborates in organizations like GDG or Spain AI. Currently, he is working as Software Engineer in the Data Layer team at Tupl.
About this article:
Scala is a great candidate to build micro-services and has great compiler checks, it is built with scalability in mind, and it runs in the Java Virtual Machine (JVM).
However, when building services that process high volumes of data, you will also require high-performance solutions, such as streaming or asynchronous computations. This leads to a more complex codebase; therefore, it becomes essential to apply abstractions.
There are several projects that offer a solution to this problem, such as scalaz or cats. cats provides modular and efficient abstractions from the functional programming (FP) world, extending Scala's FP core. As FP relies on type theory, category theory and lambda calculus, we can carefully control the application logic, as well as catch more bugs that would happen at runtime.
In this article, we will focus on cats, which also comes with a full-fledged ecosystem, with libraries like circe, for processing JSON objects, or sttp, for building HTTP clients.
Let's take the following case as an example - a method that builds an email report, and retrieves the email IDs and subjects from the email service. It succeeds only if the email list is not empty and all emails contain a subject; otherwise, it fails.
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global case class IdAndSubject(id: String, subject: Option[String]) def getEmailIdAndSubjectFromSender(sender: String): Future[List[IdAndSubject]] = ??? def addBodyHeader(content: String): String = s"Body: ${System.lineSeparator}$content" def buildEmailReport(sender: String): Future[String] = getEmailIdAndSubjectFromSender(sender) .map(buildEmailReportContent) .map(addBodyHeader) def buildEmailReportContent(idAndSubjectList: List[IdAndSubject]): String = if (idAndSubjectList.nonEmpty) { idAndSubjectList.find(_.subject.isEmpty) match { case Some(IdAndSubject(id, None)) => throw new Exception(s"Missing subject for email with ID $id") case _ => idAndSubjectList .map { case IdAndSubject(id, Some(subject)) => s"$id: $subject" } .mkString(System.lineSeparator) } } else { throw new Exception("Email list is empty") } def main(args: Array[String]): Unit = { val invalidEmailAndReports = List(IdAndSubject("1", Some("Meeting at 16:00")), IdAndSubject("2", None)) val validEmailAndReports = List(IdAndSubject("1", Some("Meeting at 16:00")), IdAndSubject("2", Some("User verification"))) println(buildEmailReportContent(validEmailAndReports)) println(buildEmailReportContent(invalidEmailAndReports)) }
The main issue here is that exceptions break the function's referential transparency, that is to say, its evaluation can cause side effects. Moreover, its behaviour cannot be fully tracked in the type system, as exceptions are not included in the method's signature. A possible solution is to use the Either type:
def buildEmailReport(sender: String): Future[Either[String, String]] = getEmailIdAndSubjectFromSender(sender) .map(buildEmailReportContentEither) .map(_.right.map(addBodyHeader)) def buildEmailReportContentEither(idAndSubjectList: List[IdAndSubject]): Either[String, String] = Either.cond(idAndSubjectList.nonEmpty, reduceIdAndSubjectList(idAndSubjectList), "Email list is empty") match { case Right(right @ Right(_)) => right case Right(left @ Left(_)) => left case Left(message) => Left(message) } def reduceIdAndSubjectList(idAndSubjects: List[IdAndSubject]): Either[String, String] = idAndSubjects.find(_.subject.isEmpty) match { case Some(IdAndSubject(id, None)) => Left(s"Missing subject for email with ID $id") case _ => val result = idAndSubjects.map { case IdAndSubject(id, Some(subject)) => s"$id: $subject" } Right(result.mkString(System.lineSeparator)) }
However, this might be difficult if you are using Scala 2.11 or previous versions, as Either is unbiased - that is, it is required to use .right and .left to explicitly use the right or left projection, respectively. In more recent versions, you might also have an issue managing the nested result, as you will need to unpack firstly Future and then Either.
One of the key goals of cats is to provide an unobtrusive and performant way to add well-defined behaviour to your Scala application. This is possible as it is based on functional programming concepts, which are mathematically proved. These concepts might be hard to grasp at first, but in the long run, they provide a solid foundation to build your applications.
A good thing about cats is that you can start by using only the modules that you find useful. We will take a look at some of the most used ones, such as Either, Validated and Semigroup, and we will see why they are useful for us.
A semigroup is a type that has an associative binary operation, such as the natural numbers with the + (addition) operation. This means that we can select two natural numbers, and its sum will also return a natural number. For example, 3 + 4 = 7, being 7 another natural number.
If we are handling an Either collection , we can use its Semigroup instance to combine them by using reduce. This happens because Either is implemented with short-circuit; thus, if at least a Left is found, then the result will be that Left; otherwise, it will collect all Right values. This behaviour is also found in Option.
import cats.implicits._ def buildEmailReport(sender: String): Future[Either[String, String]] = getEmailIdAndSubjectFromSender(sender) .map(buildEmailReportContentSemigroup) .map(_.right.map(addBodyHeader)) def buildEmailReportContentSemigroup(idAndSubjectList: List[IdAndSubject]): Either[String, String] = Either.cond(idAndSubjectList.nonEmpty, reduceIdAndSubjectList(idAndSubjectList), "Email list is empty") match { case Right(right @ Right(_)) => right case Right(left @ Left(_)) => left case Left(message) => Left(message) } def reduceIdAndSubjectList(idAndSubjects: List[IdAndSubject]): Either[String, String] = idAndSubjects .map { case IdAndSubject(id, subject) => Either.cond( subject.isDefined, List(s"$id: ${subject.get}"), s"Missing subject for email with ID $id" ) } .reduce(_ |+| _) .map(_.mkString(System.lineSeparator))
As stated earlier, the Either type from cats is essential in Scala 2.11 or previous versions. As it is biased, as in Scala 2.12 or more recent versions, they include some functionality that makes our life easier:
import cats.implicits._ def buildEmailReport(sender: String): Future[Either[String, String]] = getEmailIdAndSubjectFromSender(sender) .map(buildEmailReportContentEither) .map(_.map(addBodyHeader)) def buildEmailReportContentEither(idAndSubjectList: List[IdAndSubject]): Either[String, String] = Either .cond(idAndSubjectList.nonEmpty, reduceIdAndSubjectList(idAndSubjectList), "Email list is empty") .flatMap(identity) // now we can use .flatMap, .map and .leftMap without calling .right or .left def reduceIdAndSubjectList(idAndSubjects: List[IdAndSubject]): Either[String, String] = idAndSubjects.find(_.subject.isEmpty) match { case Some(IdAndSubject(id, None)) => Left(s"Missing subject for email with ID $id") case _ => val result = idAndSubjects.map { case IdAndSubject(id, Some(subject)) => s"$id: $subject" } Right(result.mkString(System.lineSeparator)) }
In addition, we can also use EitherT, which provides methods for manipulating Either instances inside a container, such as Future[Either[_, _]] or List[Either[_, _]]. This is pretty helpful for using the for {...} yield syntax, as it will unwrap both the Either and container types to get the right instance:
import cats.implicits._ import cats.data.EitherT def buildEmailReport(sender: String): Future[Either[String, String]] = { val result = for { idAndSubjects <- EitherT.right(getEmailIdAndSubjectFromSender(sender)) content <- EitherT.fromEither[Future](buildEmailReportContentEither(idAndSubjects)) contentWithHeader = addBodyHeader(content) } yield contentWithHeader result.value // gets Future[Either[String, String]] from EitherT[Future, String, String] } def buildEmailReportContentEither(idAndSubjectList: List[IdAndSubject]): Either[String, String] = Either .cond(idAndSubjectList.nonEmpty, reduceIdAndSubjectList(idAndSubjectList), "Email list is empty") .flatMap(identity) // now we can use .flatMap, .map and .leftMap without calling .right or .left def reduceIdAndSubjectList(idAndSubjects: List[IdAndSubject]): Either[String, String] = idAndSubjects.find(_.subject.isEmpty) match { case Some(IdAndSubject(id, None)) => Left(s"Missing subject for email with ID $id") case _ => val result = idAndSubjects.map { case IdAndSubject(id, Some(subject)) => s"$id: $subject" } Right(result.mkString(System.lineSeparator)) }
Note that to get Future[Either[String, String]] instead of EitherT[Future, String, String], you must use the .value method.
When we need to handle multiple inputs, an efficient way to validate them is to check them separately and accumulate the errors, if any. If you are using Either or Option, you are tied to solving errors one by one, as they can short-circuit. To solve this problem, we use Validated, which will keep all the errors in a collection.
For example, we can validate the training data and hyper-parameters used to train a Machine Learning model. Note that Validation provides a thin abstraction for keeping a syntax similar to that of Try, and that errors are accumulated in a collection.
import cats.implicits._ import cats.data.{NonEmptyList, Validated, ValidatedNel} import scala.util.{Success, Try} sealed trait ValidationError { val message: String } case class InvalidData(message: String) extends ValidationError case class InvalidParameter(message: String) extends ValidationError type Validation[T] = ValidatedNel[ValidationError, T] object Validation { def success[T](t: T): Validation[T] = Validated.valid(t) def failure[T](e: ValidationError): Validation[T] = Validated.invalidNel(e) } case class TrainingParameters(rows: Seq[Row], hyperParameters: Map[String, String]) case class Row(id: String, sepalLength: String, sepalWidth: String) def validateTrainingParameters(data: String, hyperParameters: Map[String, String]): Validation[TrainingParameters] = { val validatedData = validateData(data) val validatedParameters = validateHyperParameters(hyperParameters) (validatedData, validatedParameters).mapN(TrainingParameters) }
We need to make sure that the number of epochs is smaller than 1000, and that the model is supported. In addition, we want to check if the CSV input contains the expected columns.
val CsvHeader: String = "id,sepalLength,sepalWidth" val maxEpochs: Int = 1000 val validAlgorithms: Seq[String] = List("SVM", "LogisticRegression", "SGD") def validateRow(row: String): Validation[Row] = { row.split(",") match { case Array(id, sepalLenght, sepalWidth) => Validation.success(Row(id, sepalLenght, sepalWidth)) case row => val message = s"Invalid row ${row.mkString("Array(", ", ", ")")}" Validation.failure(InvalidData(message)) } } def validateData(data: String): Validation[List[Row]] = { data.split(System.lineSeparator).toList.toNel match { case Some(NonEmptyList(CsvHeader, rows)) => rows.map(validateRow).sequence case Some(NonEmptyList(header, _)) => Validation.failure(InvalidData(s"Unexpected header $header")) case None => Validation.failure(InvalidData("Empty CSV")) } } def validateHyperParameters(hyperParameters: Map[String, String]): Validation[Map[String, String]] = { val validModel = hyperParameters.get("model") match { case Some(model) if validAlgorithms.contains(model) => Validation.success(model) case Some(model) => Validation.failure(InvalidParameter(s"Unknown model $model")) case None => Validation.failure(InvalidParameter("Model not found")) } val validEpochs = Try(hyperParameters.get("epochs").map(_.toInt)) match { case Success(Some(epochs)) if 1 to maxEpochs contains epochs => Validation.success(epochs.toString) case _ => Validation.failure(InvalidParameter("Epochs parameter is not valid")) } (validModel, validEpochs).mapN((model, epochs) => Map("model" -> model, "epochs" -> epochs)) } def validateTrainingParameters(data: String, hyperParameters: Map[String, String]): Validation[TrainingParameters] = { val validatedData = validateData(data) val validatedParameters = validateHyperParameters(hyperParameters) (validatedData, validatedParameters).mapN(TrainingParameters) }
This way, we are able to report multiple errors in our systems in a single pass:
def main(args: Array[String]): Unit = { val invalidData = """id,petalLength,petalWidth |field1/flower1,1.1,1.67 |""".stripMargin val validData = """id,sepalLength,sepalWidth |field1/flower1,3.1,2.7 |field1/flower2,2.56,2.81 |field2/flower1,1.0,2.05 |""".stripMargin val hyperParameters = Map("model" -> "SGD", "epochs" -> "100") val invalidHyperParameters = Map("epochs" -> "1001", "model" -> "Perceptron") println(validateTrainingParameters(validData, hyperParameters)) // Valid(TrainingParameters(List(Row(field1/flower1,3.1,2.7), Row(field1/flower2,2.56,2.81), Row(field2/flower1,1.0,2.05)),Map(model -> SGD, epochs -> 100))) println(validateTrainingParameters(invalidData, invalidHyperParameters)) // Invalid(NonEmptyList(InvalidData(Unexpected header id,petalLength,petalWidth), InvalidParameter(Unknown model Perceptron), InvalidParameter(Epochs parameter is not valid))) }
OptionT allows to deal with Option instances that are inside a container type, much like it is done in EitherT:
import cats.data.OptionT import cats.implicits._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import scala.concurrent.{Future, Await} def getSenderNameFromSender(sender: String): Option[String] = sender match { case "admin" => Some("admin@internet.com") case _ => Some("all.developers@internet.com") } def getLastEmailSubjectFromSender(sender: String): Future[Option[String]] = sender match { case "admin" => Future.successful(None) case _ => Future.successful(Some("[Blocker] Unknown bug")) } def getLastEmailBodyFromSender(sender: String): Future[Option[String]] = Future.successful(Some("Hi, are you free? I need help with a bug")) def buildLastEmailReport(sender: String): Future[Option[String]] = { val result = for { emailSubject <- OptionT(getLastEmailSubjectFromSender(sender)) emailBody <- OptionT(getLastEmailBodyFromSender(sender)) senderName <- OptionT.fromOption[Future](getSenderNameFromSender(sender)) } yield s"$senderName: $emailSubject ${System.lineSeparator} $emailBody" result.value // gets Future[Option[String]] from OptionT[Future, String] } def main(args: Array[String]): Unit = { println(Await.result(buildLastEmailReport("admin"), Duration.Inf)) println(Await.result(buildLastEmailReport("Joseph"), Duration.Inf)) }
This way, it would build an email report for the last email from the sender, only if both the subject and body are available.
By extending the core FP functionalities of Scala, cats provides a sound foundation for developing safer, simpler and more efficient applications. Since we started using cats, we have increased the number of bugs we catch in the testing stage, and our codebase is more maintainable. Moreover, it also plays nicely with some high-performance frameworks we use, such as Apache Kafka or Akka.
If you want to learn more about cats, we recommend you to take a look at the available resources for learners.
• Cats library
2024-12-10 · Press Releases
2024-10-14 · Press Releases
2024-10-02 · Press Releases
Get started and request a demo to learn how our solutions can help you.