Introduction
In the previous part of this series, we looked at writing R functions that can be executed directly by Spark without serialization overhead with a focus on writing functions as combinations of dplyr verbs and investigated how the SQL is generated and Spark plans created.
In this third part, we will look at how to write R functions that generate SQL queries that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed. We also briefly present wrapping these approaches into functions that can be combined with other Spark operations.
Contents
Preparation
The full setup of Spark and sparklyr is not in the scope of this post, please check the previous one for some setup instructions and a ready-made Docker image.
If you have docker available, running
docker run -d -p 8787:8787 -e PASSWORD=pass --name rstudio jozefhajnala/sparkly:add-rstudio
Should make RStudio available by navigating to http://localhost:8787 in your browser. You can then use the user name rstudio
and password pass
to login and continue experimenting with the code in this post.
# Load packages
suppressPackageStartupMessages({
library(sparklyr)
library(dplyr)
library(nycflights13)
})
# Prepare the data
weather <- nycflights13::weather %>%
mutate(id = 1L:nrow(nycflights13::weather)) %>%
select(id, everything())
# Connect
sc <- sparklyr::spark_connect(master = "local")
# Copy the weather dataset to the instance
tbl_weather <- dplyr::copy_to(
dest = sc,
df = weather,
name = "weather",
overwrite = TRUE
)
# Copy the flights dataset to the instance
tbl_flights <- dplyr::copy_to(
dest = sc,
df = nycflights13::flights,
name = "flights",
overwrite = TRUE
)
R functions as Spark SQL generators
There are use cases where it is desirable to express the operations directly with SQL instead of combining dplyr verbs, for example when working within multi-language environments where re-usability is important. We can then send the SQL query directly to Spark to be executed. To create such queries, one option is to write R functions that work as query constructors.
Again using a very simple example, a naive implementation of column normalization could look as follows. Note that the use of SELECT *
is discouraged and only here for illustration purposes:
normalize_sql <- function(df, colName, newColName) {
paste0(
"SELECT",
"\n ", df, ".*", ",",
"\n (", colName, " - (SELECT avg(", colName, ") FROM ", df, "))",
" / ",
"(SELECT stddev_samp(", colName,") FROM ", df, ") as ", newColName,
"\n", "FROM ", df
)
}
Using the weather
dataset would then yield the following SQL query when normalizing the temp
column:
normalize_temp_query <- normalize_sql("weather", "temp", "normTemp")
cat(normalize_temp_query)
## SELECT
## weather.*,
## (temp - (SELECT avg(temp) FROM weather)) / (SELECT stddev_samp(temp) FROM weather) as normTemp
## FROM weather
Now that we have the query created, we can look at how to send it to Spark for execution.
Executing the generated queries via Spark
Using DBI as the interface
The R package DBI provides an interface for communication between R and relational database management systems. We can simply use the dbGetQuery()
function to execute our query, for instance:
res <- DBI::dbGetQuery(sc, statement = normalize_temp_query)
head(res)
## id origin year month day hour temp dewp humid wind_dir wind_speed
## 1 1 EWR 2013 1 1 1 39.02 26.06 59.37 270 10.35702
## 2 2 EWR 2013 1 1 2 39.02 26.96 61.63 250 8.05546
## 3 3 EWR 2013 1 1 3 39.02 28.04 64.43 240 11.50780
## 4 4 EWR 2013 1 1 4 39.92 28.04 62.21 250 12.65858
## 5 5 EWR 2013 1 1 5 39.02 28.04 64.43 260 12.65858
## 6 6 EWR 2013 1 1 6 37.94 28.04 67.21 240 11.50780
## wind_gust precip pressure visib time_hour normTemp
## 1 NaN 0 1012.0 10 2013-01-01 06:00:00 -0.9130047
## 2 NaN 0 1012.3 10 2013-01-01 07:00:00 -0.9130047
## 3 NaN 0 1012.5 10 2013-01-01 08:00:00 -0.9130047
## 4 NaN 0 1012.2 10 2013-01-01 09:00:00 -0.8624083
## 5 NaN 0 1011.9 10 2013-01-01 10:00:00 -0.9130047
## 6 NaN 0 1012.4 10 2013-01-01 11:00:00 -0.9737203
As we might have noticed thanks to the way the result is printed, a standard data frame is returned, as opposed to tibbles returned by most sparklyr operations.
It is important to note that using
dbGetQuery()
automatically computes and collects the results to the R session. This is in contrast with the dplyr approach which constructs the query and only collects the results to the R session whencollect()
is called, or computes them whencompute()
is called.
We will now examine 2 options to use the prepared query lazily and without collecting the results into the R session.
Invoking sql on a Spark session object
Without going into further details on the invoke()
functionality of sparklyr which we will focus on in the fourth installment of the series, if the desire is to have a “lazy” SQL that does not get automatically computed and collected when called from R, we can invoke a sql
method on a SparkSession class object.
The method takes a string SQL query as input and processes it using Spark, returning the result as a Spark DataFrame. This gives us the ability to only compute and collect the results when desired:
# Use the query "lazily" without execution:
normalized_lazy_ds <- sc %>%
spark_session() %>%
invoke("sql", normalize_temp_query)
normalized_lazy_ds
## <jobj[124]>
## org.apache.spark.sql.Dataset
## [id: int, origin: string ... 15 more fields]
# Collect when needed:
normalized_lazy_ds %>% collect()
## # A tibble: 26,115 x 17
## id origin year month day hour temp dewp humid wind_dir
## <int> <chr> <dbl> <dbl> <int> <int> <dbl> <dbl> <dbl> <dbl>
## 1 1 EWR 2013 1 1 1 39.0 26.1 59.4 270
## 2 2 EWR 2013 1 1 2 39.0 27.0 61.6 250
## 3 3 EWR 2013 1 1 3 39.0 28.0 64.4 240
## 4 4 EWR 2013 1 1 4 39.9 28.0 62.2 250
## 5 5 EWR 2013 1 1 5 39.0 28.0 64.4 260
## 6 6 EWR 2013 1 1 6 37.9 28.0 67.2 240
## 7 7 EWR 2013 1 1 7 39.0 28.0 64.4 240
## 8 8 EWR 2013 1 1 8 39.9 28.0 62.2 250
## 9 9 EWR 2013 1 1 9 39.9 28.0 62.2 260
## 10 10 EWR 2013 1 1 10 41 28.0 59.6 260
## # … with 26,105 more rows, and 7 more variables: wind_speed <dbl>,
## # wind_gust <dbl>, precip <dbl>, pressure <dbl>, visib <dbl>,
## # time_hour <dttm>, normTemp <dbl>
Using tbl with dbplyr’s sql
The above method gives us a reference to a Java object as a result, which might be less intuitive to work with for R users. We can also opt to use dbplyr’s sql()
function in combination with tbl()
to get a more familiar result.
Note that when printing the below normalized_lazy_tbl
, the query gets partially executed to provide the first few rows. Only when collect()
is called the entire set is retrieved to the R session:
# Nothing is executed yet
normalized_lazy_tbl <- normalize_temp_query %>%
dbplyr::sql() %>%
tbl(sc, .)
# Print the first few rows
normalized_lazy_tbl
## # Source: spark<SELECT weather.*, (temp - (SELECT avg(temp) FROM weather))
## # / (SELECT stddev_samp(temp) FROM weather) as normTemp FROM weather>
## # [?? x 17]
## id origin year month day hour temp dewp humid wind_dir
## <int> <chr> <dbl> <dbl> <int> <int> <dbl> <dbl> <dbl> <dbl>
## 1 1 EWR 2013 1 1 1 39.0 26.1 59.4 270
## 2 2 EWR 2013 1 1 2 39.0 27.0 61.6 250
## 3 3 EWR 2013 1 1 3 39.0 28.0 64.4 240
## 4 4 EWR 2013 1 1 4 39.9 28.0 62.2 250
## 5 5 EWR 2013 1 1 5 39.0 28.0 64.4 260
## 6 6 EWR 2013 1 1 6 37.9 28.0 67.2 240
## 7 7 EWR 2013 1 1 7 39.0 28.0 64.4 240
## 8 8 EWR 2013 1 1 8 39.9 28.0 62.2 250
## 9 9 EWR 2013 1 1 9 39.9 28.0 62.2 260
## 10 10 EWR 2013 1 1 10 41 28.0 59.6 260
## # … with more rows, and 7 more variables: wind_speed <dbl>,
## # wind_gust <dbl>, precip <dbl>, pressure <dbl>, visib <dbl>,
## # time_hour <dttm>, normTemp <dbl>
# Collect the entire result to the R session and print
normalized_lazy_tbl %>% collect()
## # A tibble: 26,115 x 17
## id origin year month day hour temp dewp humid wind_dir
## <int> <chr> <dbl> <dbl> <int> <int> <dbl> <dbl> <dbl> <dbl>
## 1 1 EWR 2013 1 1 1 39.0 26.1 59.4 270
## 2 2 EWR 2013 1 1 2 39.0 27.0 61.6 250
## 3 3 EWR 2013 1 1 3 39.0 28.0 64.4 240
## 4 4 EWR 2013 1 1 4 39.9 28.0 62.2 250
## 5 5 EWR 2013 1 1 5 39.0 28.0 64.4 260
## 6 6 EWR 2013 1 1 6 37.9 28.0 67.2 240
## 7 7 EWR 2013 1 1 7 39.0 28.0 64.4 240
## 8 8 EWR 2013 1 1 8 39.9 28.0 62.2 250
## 9 9 EWR 2013 1 1 9 39.9 28.0 62.2 260
## 10 10 EWR 2013 1 1 10 41 28.0 59.6 260
## # … with 26,105 more rows, and 7 more variables: wind_speed <dbl>,
## # wind_gust <dbl>, precip <dbl>, pressure <dbl>, visib <dbl>,
## # time_hour <dttm>, normTemp <dbl>
Wrapping the tbl approach into functions
In the approach above we provided sc
in the call to tbl()
. When wrapping such processes into a function, it might however be useful to take the specific DataFrame reference as an input instead of the generic Spark connection reference.
In that case, we can use the fact that the connection reference is also stored in the DataFrame reference, in the con
sub-element of the src
element. For instance, looking at our tbl_weather
:
class(tbl_weather[["src"]][["con"]])
## [1] "spark_connection" "spark_shell_connection"
## [3] "DBIConnection"
Putting this together, we can create a simple wrapper function that lazily sends a SQL query to be processed on a particular Spark DataFrame reference:
lazy_spark_query <- function(tbl, qry) {
qry %>%
dbplyr::sql() %>%
dplyr::tbl(tbl[["src"]][["con"]], .)
}
And use it to do the same as we did above with a single function call:
lazy_spark_query(tbl_weather, normalize_temp_query) %>%
collect()
## # A tibble: 26,115 x 17
## id origin year month day hour temp dewp humid wind_dir
## <int> <chr> <dbl> <dbl> <int> <int> <dbl> <dbl> <dbl> <dbl>
## 1 1 EWR 2013 1 1 1 39.0 26.1 59.4 270
## 2 2 EWR 2013 1 1 2 39.0 27.0 61.6 250
## 3 3 EWR 2013 1 1 3 39.0 28.0 64.4 240
## 4 4 EWR 2013 1 1 4 39.9 28.0 62.2 250
## 5 5 EWR 2013 1 1 5 39.0 28.0 64.4 260
## 6 6 EWR 2013 1 1 6 37.9 28.0 67.2 240
## 7 7 EWR 2013 1 1 7 39.0 28.0 64.4 240
## 8 8 EWR 2013 1 1 8 39.9 28.0 62.2 250
## 9 9 EWR 2013 1 1 9 39.9 28.0 62.2 260
## 10 10 EWR 2013 1 1 10 41 28.0 59.6 260
## # … with 26,105 more rows, and 7 more variables: wind_speed <dbl>,
## # wind_gust <dbl>, precip <dbl>, pressure <dbl>, visib <dbl>,
## # time_hour <dttm>, normTemp <dbl>
Combining multiple approaches and functions into lazy datasets
The power of Spark partly comes from the lazy execution and we can take advantage of this in ways that are not immediately obvious. Consider the following function we have shown previously:
lazy_spark_query
## function(tbl, qry) {
## qry %>%
## dbplyr::sql() %>%
## dplyr::tbl(tbl[["src"]][["con"]], .)
## }
Since the output of this function without collection is actually only a translated SQL statement, we can take that output and keep combinining it with other operations, for instance:
qry <- normalize_sql("flights", "dep_delay", "dep_delay_norm")
lazy_spark_query(tbl_flights, qry) %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)) %>%
collect()
## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## # A tibble: 3 x 2
## origin `mean(dep_delay_norm)`
## <chr> <dbl>
## 1 EWR 0.0614
## 2 JFK -0.0131
## 3 LGA -0.0570
The crucial advantage is that even though the lazy_spark_query
would return the entire updated weather dataset when collected stand-alone, in combination with other operations Spark first figures out how to execute all the operations together efficiently and only then physically executes them and returns only the grouped and aggregated data to the R session.
We can therefore effectively combine multiple approaches to interfacing with Spark while still keeping the benefit of retrieving only very small, aggregated amounts of data to the R session. The effect is quite significant even with a dataset as small as flights
(336,776 rows of 19 columns) and with a local Spark instance. The chart below compares executing a query lazily, aggregating within Spark and only retrieving the aggregated data, versus retrieving first and aggregating locally. The third boxplot shows the cost of pure collection on the query itself:
bench <- microbenchmark::microbenchmark(
times = 20,
collect_late = lazy_spark_query(tbl_flights, qry) %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)) %>%
collect(),
collect_first = lazy_spark_query(tbl_flights, qry) %>%
collect() %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)),
collect_only = lazy_spark_query(tbl_flights, qry) %>%
collect()
)
Where SQL can be better than dbplyr translation
When a translation is not there
We have discussed in the first part that the set of operations translated to Spark SQL via dbplyr may not cover all possible use cases. In such a case, the option to write SQL directly is very useful.
When translation does not provide expected results
In some instances using dbplyr to translate R operations to Spark SQL can lead to unexpected results. As one example, consider the following integer division on a column of a local data frame.
# id_div_5 is as expected
weather %>%
mutate(id_div_5 = id %/% 5L) %>%
select(id, id_div_5)
## # A tibble: 26,115 x 2
## id id_div_5
## <int> <int>
## 1 1 0
## 2 2 0
## 3 3 0
## 4 4 0
## 5 5 1
## 6 6 1
## 7 7 1
## 8 8 1
## 9 9 1
## 10 10 2
## # … with 26,105 more rows
As expected, we get the result of integer division in the id_div_5
column. However, applying the very same operation on a Spark DataFrame yields unexpected results:
# id_div_5 is normal division, not integer division
tbl_weather %>%
mutate(id_div_5 = id %/% 5L) %>%
select(id, id_div_5)
## # Source: spark<?> [?? x 2]
## id id_div_5
## <int> <dbl>
## 1 1 0.2
## 2 2 0.4
## 3 3 0.6
## 4 4 0.8
## 5 5 1
## 6 6 1.2
## 7 7 1.4
## 8 8 1.6
## 9 9 1.8
## 10 10 2
## # … with more rows
This is due to the fact that translation to integer division is quite difficult to implement: https://github.com/tidyverse/dbplyr/issues/108. We could certainly figure our a way to fix this particular issue, but the workarounds may prove inefficient:
tbl_weather %>%
mutate(id_div_5 = as.integer(id %/% 5L)) %>%
select(id, id_div_5)
## # Source: spark<?> [?? x 2]
## id id_div_5
## <int> <int>
## 1 1 0
## 2 2 0
## 3 3 0
## 4 4 0
## 5 5 1
## 6 6 1
## 7 7 1
## 8 8 1
## 9 9 1
## 10 10 2
## # … with more rows
# Not too efficient:
tbl_weather %>%
mutate(id_div_5 = as.integer(id %/% 5L)) %>%
select(id, id_div_5) %>%
explain()
## <SQL>
## SELECT `id`, CAST(`id` / 5 AS INT) AS `id_div_5`
## FROM `weather`
##
## <PLAN>
## == Physical Plan ==
## *(1) Project [id#24, cast((cast(id#24 as double) / 5.0) as int) AS id_div_5#4273]
## +- InMemoryTableScan [id#24]
## +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
## +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]
Using SQL and the knowledge that Hive does provide a built-in DIV
arithmetic operator, we can get the desired results very simply and efficiently with writing SQL:
"SELECT `id`, `id` DIV 5 `id_div_5` FROM `weather`" %>%
dbplyr::sql() %>%
tbl(sc, .)
## # Source: spark<SELECT `id`, `id` DIV 5 `id_div_5` FROM `weather`> [?? x
## # 2]
## id id_div_5
## <int> <dbl>
## 1 1 0
## 2 2 0
## 3 3 0
## 4 4 0
## 5 5 1
## 6 6 1
## 7 7 1
## 8 8 1
## 9 9 1
## 10 10 2
## # … with more rows
Even though the numeric value of the results is correct here, we may still notice that the class of the returned id_div_5
column is actually numeric instead of integer. Such is the life of developers using data processing interfaces.
When portability is important
Since the languages that provide interfaces to Spark are not limited to R and multi-language setups are quite common, another reason to use SQL statements directly is the portability of such solutions. A SQL statement can be executed by interfaces provided for all languages - Scala, Java, and Python, without the need to rely on R-specific packages such as dbplyr.
References
- The first part of this series
- The second part of this series
- Documentation on Hive Operators and User-Defined Functions website.
- A Docker image with R, Spark, sparklyr and Arrow available and its Dockerfile.
- The DBI package on CRAN