Using Spark from R for performance with arbitrary code - Part 1 - Spark SQL translation, custom functions, and Arrow

Introduction

Apache Spark is a popular open-source analytics engine for big data processing and thanks to the sparklyr and SparkR packages, the power of Spark is also available to R users.

This series of articles will attempt to provide practical insights into using the sparklyr interface to gain the benefits of Apache Spark while still retaining the ability to use R code organized in custom-built functions and packages.

In this first part, we will examine how the sparklyr interface communicates with the Spark instance and what this means for performance with regards to arbitrarily defined R functions. We will also look at how Apache Arrow can improve the performance of object serialization.

Setting up Spark with R and sparklyr

The full instructions on setting up sparklyr are not in the scope of this article, below we only provide a quick set of instructions to get a local Spark instance working with sparklyr.

Apache Spark and R logos

Apache Spark and R logos

Using a ready-made Docker Image

For the purpose of this series, a Docker image was built which you can use to experiment in the following ways by running one of the commands below within a terminal. If you are using RStudio 1.1 or newer, Terminal functionality is built into RStudio itself.

Interactively with R and sparklyr

Running the following should yield an interactive R session with all prerequisites to start working with the sparklyr package using a local Spark instance.

docker run --rm -it jozefhajnala/sparkly:test R

# Start using sparklyr
library(sparklyr)
sc <- spark_connect("local")

Interactively with the Spark shell

Running the following should yield an interactive Scala REPL instance. A Spark context should be available as sc and a Spark session as spark.

docker run --rm -it jozefhajnala/sparkly:test /root/spark/spark-2.4.3-bin-hadoop2.7/bin/spark-shell

Running an example R script

Running the following should execute an example R script using sparklyr with output appearing in the terminal:

docker run --rm jozefhajnala/sparkly:test Rscript /root/.local/spark_script.R

Manual Installation

The following are very basic instructions, for troubleshooting or more detailed step-by-step guides you can refer to RStudio’s spark website.

install.packages("sparklyr")
install.packages("nycflights13")
sparklyr::spark_install(version = "2.4.3")

Connecting and using a local Spark instance

# Load packages
library(sparklyr)
library(dplyr)
library(nycflights13)

# Connect
sc <- sparklyr::spark_connect(master = "local")

# Copy the weather dataset to the instance
tbl_weather <- dplyr::copy_to(
  dest = sc, 
  df = nycflights13::weather,
  name = "weather",
  overwrite = TRUE
)

# Collect it back
tbl_weather %>% collect()

Sparklyr as a Spark interface provider

The sparklyr package is an R interface to Apache Spark. The meaning of the word interface is very important in this context as the way we use this interface can significantly affect the performance benefits we get from using Spark.

To understand the meaning of the above a bit better, we will examine 3 very simple functions that are different in implementation but intend to provide the same results, and how they behave with regards to Spark. We will use datasets from the nycflights13 package for our examples.

An R function translated to Spark SQL

Using the following fun_implemented() function will yield the expected results for both a local data frame nycflights13::weather and the remote Spark object referenced by tbl_weather:

# An R function translated to Spark SQL
fun_implemented <- function(df, col) {
  df %>% mutate({{col}} := tolower({{col}}))
}
fun_implemented(nycflights13::weather, origin)
fun_implemented(tbl_weather, origin)

This is because the R function tolower was translated by dbplyr to Spark SQL function LOWER and the resulting query was sent to Spark to be executed. We can see the actual translated SQL by running sql_render() on the function call:

dbplyr::sql_render(
  fun_implemented(tbl_weather, origin)
)
<SQL> SELECT LOWER(`origin`) AS `origin`, `year`, `month`, `day`, `hour`,
`temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`,
`pressure`, `visib`, `time_hour`
FROM `weather`

An R function not translated to Spark SQL

Using the following fun_r_only() function will only yield the expected results for a local data frame nycflights13::weather. For the remote Spark object referenced by tbl_weather we will get an error:

# An R function not translated to Spark SQL
fun_r_only <- function(df, col) {
  df %>% mutate({{col}} := casefold({{col}}, upper = FALSE))
}
fun_r_only(nycflights13::weather, origin)
fun_r_only(tbl_weather, origin)
 Error: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'AS' expecting ')'(line 1, pos 32)

== SQL ==
SELECT casefold(`origin`, FALSE AS `upper`) AS `origin`, 
`year`, `month`, `day`, `hour`, 
`temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, 
`precip`, `pressure`, `visib`, `time_hour`
--------------------------------^^^
FROM `weather`

This is because there simply is no translation provided by dbplyr for the casefold() function. The generated Spark SQL will therefore not be valid and throw an error once the Spark SQL parser tries to parse it.

A Hive built-in function not existing in R

On the other hand, using the below fun_hive_builtin() function will only yield the expected results for the remote Spark object referenced by tbl_weather. For the local data frame nycflights13::weather we will get an error:

# A Hive built-in function not existing in R
fun_hive_builtin <- function(df, col) {
  df %>% mutate({{col}} := lower({{col}}))
}
fun_hive_builtin(tbl_weather, origin)
fun_hive_builtin(nycflights13::weather, origin)
Error: Evaluation error: could not find function "lower".

This is because the function lower does not exist in R itself. For a non-existing R function there obviously is no dbplyr translation either. In this case, dbplyr keeps it as-is when translating to SQL, and the SQL will be valid and executed without problems because lower is, in fact, a function built-in to Hive:

dbplyr::sql_render(fun_hive_builtin(tbl_weather, origin))
<SQL> SELECT lower(`origin`) AS `origin`,
`year`, `month`, `day`, `hour`,
`temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`,
`precip`, `pressure`, `visib`, `time_hour`
FROM `weather`

Using non-translated functions with sparklyr

It can easily happen that one of the functions we want to use falls into the category where it is neither translated or a Hive built-in function. In this case, there is another interface provided by sparklyr that can allow us to do that - the spark_apply() function. Here is an oversimplified example that will reach our goal with casefold():

fun_r_custom <- function(tbl, colName) {
  tbl[[colName]] <- casefold(tbl[[colName]], upper = FALSE)
  tbl
}

spark_apply(tbl_weather, fun_r_custom, context = {colName <- "origin"})

What is so important about this distinction?

We have now shown that we can also send code that was not translated by dbplyr to Spark and get it executed without issues using spark_apply(). So what is the catch and where does the importance of the meaning of the word interface come in?

Let us quickly examine the performance of the operations:

mb = microbenchmark::microbenchmark(
  times = 10,
  hive_builtin = fun_hive_builtin(tbl_weather, origin) %>% collect(),
  translated_dplyr = fun_implemented(tbl_weather, origin) %>% collect(),
  spark_apply = spark_apply(tbl_weather, fun_r_custom, context = {colName <- "origin"}) %>% collect()
)

Note that the absolute values here will vary based on the setup, the important message is in the relative differences.

We can see that the operations executed via the SQL translation mechanism of dbplyr were executed in around 0.5 seconds while those via spark_apply took orders of magnitude longer - more than 6 minutes.

What happens when we use custom functions with spark_apply

We can now see that the operation with spark_apply() is extremely slow compared to the other two. The key to understanding the difference is to examine how the custom transformations of data using R functions are performed within spark_apply(). In simplified terms, this happens in a few steps:

  1. the data is moved in row-format from Spark into the R process through a socket connection. This is inefficient as multiple data types need to be deserialized over each row
  2. the data gets converted to columnar format since this is how R data frames are implemented
  3. the R functions are applied to compute the results
  4. the results are again converted to row-format, serialized row-by-row and sent back to Spark over the socket connection

What happens when we use translated or Hive built-in functions

When using functions that can be translated to Spark SQL the process is very different

  • The call is translated to Spark SQL using the dbplyr backend
  • The constructed query is sent to Spark for execution using DBI
  • Only when collect() or compute() is called, the SQL is executed within Spark
  • Only when collect() is called the results are also sent to the R session

This means that the transfer of data only happens once and only when collect() is called, which saves a vast amount of overhead.

Which R functionality is currently translated and built-in to Hive

An important question to answer with regards to performance then is what amount of functionality is available using the fast dbplyr backend. As seen above, these features can be categorized into two groups:

  1. R functions translatable to Spark SQL via dbplyr. The full list of such functions is available on RStudio’s sparklyr website

  2. Hive built-in functions that get translated as they are and can be evaluated by Spark. The full list is available on the Hive Operators and User-Defined Functions website.

Making serialization faster with Apache Arrow

What is Apache Arrow and how it improves performance

Our benchmarks have shown that using spark_apply() does not scale well and the penalty of the bottleneck in performance caused by serialization, deserialization, and transfer is too high.

To partially mitigate this we can take advantage of Apache Arrow, a cross-language development platform for in-memory data that specifies a standardized language-independent columnar memory format for flat and hierarchical data.

By adding support for Arrow in sparklyr, it makes Spark perform the row-format to column-format conversion in parallel in Spark, data is then transferred through the socket but no custom serialization takes place and all the R process needs to do is copy this data from the socket into its heap, transform it and copy it back to the socket connection.

This makes the process significantly faster:

mb = microbenchmark::microbenchmark(
  times = 10, 
  setup = library(arrow),
  hive_builtin = fun_hive_builtin(tbl_weather, origin) %>% collect(),
  translated_dplyr = fun_implemented(tbl_weather, origin) %>% collect(),
  spark_apply_arrow = spark_apply(tbl_weather, fun_r_custom, context = {colName <- "origin"}) %>% collect()
)

We can see that the timing on spark_apply() decreased from more than 6 minutes to around 4.5 seconds, which is a very signigicant performance boost. Compared to the other methods we however still experience an order of magnitude difference.

Notes on the setup of Apache Arrow

It is worth noting that the implementation of Apache Arrow into R arrived on CRAN early August 2019, which means at the time of writing of this article it is on CRAN about 3 weeks. The functionality also depends on the Arrow C++ library, so installation is a bit more difficult than with some other R packages.

Care should also be taken with regards to the capability of the C++ library, the arrow R package version and the version of sparklyr. We had good results with using the R package arrow version 0.14.1, sparklyr 1.0.2 and the 0.14.1 version of the C++ libraries.

The aforementioned Docker image has both the C++ libraries and the R arrow package available for use.

The take-home message

Adding Arrow to the mix certainly significantly improved the performance of our example code, but is still quite slow compared to the native approach. Based on the above, we could conclude that

Performance benefits are present mainly when all the computation is performed within Spark and R serves merely as a “messaging agent”, sending commands to Spark to be executed. If there are object serialization and transfer of larger objects present, performance is strongly impacted.

The take-home message from this exercise is that we should strive to only use R code that can be executed within the Spark instance. If we need some data retrieved, it is advisable that this is data that was previously heavily aggregated within Spark and only a small amount is transferred to the R session.

But we still need arbitrary R function to run fast on Spark

In the next installments of this series, we will investigate a few options that allow us to retain the performance of Spark while still being able to write arbitrary R functions (i.e. using methods already implemented and available in the Spark API from R by implementing R functions not directly provided by the sparklyr interface) by:

  1. Rewriting the functions as collections of dplyr verbs that all support translation to Spark SQL
  2. Rewriting the functions as series of Scala method invocations
  3. Rewriting the functions into Spark SQL and using DBI to execute directly

References