Using Spark from R for performance with arbitrary code - Part 5 - Exploring the invoke API from R with Java reflection and examining invokes with logs

Introduction

In the previous parts of this series, we have shown how to write functions as both combinations of dplyr verbs, SQL query generators that can be executed by Spark and how to use the lower-level API to invoke methods on Java object references from R.

In this fifth part, we will look into more details around sparklyr’s invoke() API, investigate available methods for different classes of objects using the Java reflection API and look under the hood of the sparklyr interface mechanism with invoke logging.

Preparation

The full setup of Spark and sparklyr is not in the scope of this post, please check the first one for some setup instructions and a ready-made Docker image.

If you have docker available, running

docker run -d -p 8787:8787 -e PASSWORD=pass --name rstudio jozefhajnala/sparkly:add-rstudio

Should make RStudio available by navigating to http://localhost:8787 in your browser. You can then use the user name rstudio and password pass to login and continue experimenting with the code in this post.

# Load packages
suppressPackageStartupMessages({
  library(sparklyr)
  library(dplyr)
  library(nycflights13)
})

# Connect and copy the flights dataset to the instance
sc <- sparklyr::spark_connect(master = "local")
tbl_flights <- dplyr::copy_to(sc, nycflights13::flights, "flights")

Examining available methods from R

If you did not do so, it is recommended to read the previous part of this series before this one to get a quick overview of the invoke() API.

Using the Java reflection API to list the available methods

The invoke() interface is powerful, but also a bit hidden from the eyes as we do not immediately know what methods are available for which object classes. We can circumvent that using the getMethods method which (in short) returns an array of Method objects reflecting public member methods of the class.

For instance, retrieving a list of methods for the org.apache.spark.SparkContext class:

mthds <- sc %>% spark_context() %>%
  invoke("getClass") %>%
  invoke("getMethods")
head(mthds)
## [[1]]
## <jobj[54]>
##   java.lang.reflect.Method
##   public org.apache.spark.util.CallSite org.apache.spark.SparkContext.org$apache$spark$SparkContext$$creationSite()
## 
## [[2]]
## <jobj[55]>
##   java.lang.reflect.Method
##   public org.apache.spark.SparkConf org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_conf()
## 
## [[3]]
## <jobj[56]>
##   java.lang.reflect.Method
##   public org.apache.spark.SparkEnv org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_env()
## 
## [[4]]
## <jobj[57]>
##   java.lang.reflect.Method
##   public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_progressBar()
## 
## [[5]]
## <jobj[58]>
##   java.lang.reflect.Method
##   public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_ui()
## 
## [[6]]
## <jobj[59]>
##   java.lang.reflect.Method
##   public org.apache.spark.rpc.RpcEndpointRef org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_heartbeatReceiver()

We can see that the invoke() chain has returned a list of Java object references, each of them of class java.lang.reflect.Method. This is a good result, but the output is not very user-friendly from the R user perspective. Let us write a small wrapper that will return a some of the method’s details in a more readable fashion, for instance the return type and an overview of parameters:

getMethodDetails <- function(mthd) {
  returnType <- mthd %>% invoke("getReturnType") %>% invoke("toString")
  params <- mthd %>% invoke("getParameters")
  params <- vapply(params, invoke, "toString", FUN.VALUE = character(1))
  c(returnType = returnType, params = paste(params, collapse = ", "))
}

Finally, to get a nice overview, we can make another helper function that will return a named list of methods for an object’s class, including their return types and overview of parameters:

getAvailableMethods <- function(jobj) {
  mthds <- jobj %>% invoke("getClass") %>% invoke("getMethods")
  nms <- vapply(mthds, invoke, "getName", FUN.VALUE = character(1))
  res <- lapply(mthds, getMethodDetails)
  names(res) <- nms
  res
}

Investigating DataSet and SparkContext class methods

Using the above defined function we can explore the methods available to a DataFrame reference, showing a few of the names first:

dfMethods <- tbl_flights %>% spark_dataframe() %>%
  getAvailableMethods()

# Show some method names:
dfMethodNames <- sort(unique(names(dfMethods)))
head(dfMethodNames, 20)
##  [1] "agg"                           "alias"                        
##  [3] "apply"                         "as"                           
##  [5] "cache"                         "checkpoint"                   
##  [7] "coalesce"                      "col"                          
##  [9] "collect"                       "collectAsArrowToPython"       
## [11] "collectAsList"                 "collectToPython"              
## [13] "colRegex"                      "columns"                      
## [15] "count"                         "createGlobalTempView"         
## [17] "createOrReplaceGlobalTempView" "createOrReplaceTempView"      
## [19] "createTempView"                "crossJoin"

If we would like to see more details we can now investigate further, for instance show different parameter interfaces for the agg method, showing that the agg method has the following parameter interfaces:

sort(vapply(
  dfMethods[names(dfMethods) == "agg"], 
  `[[`, "params",
  FUN.VALUE = character(1)
))
##                                                                                                                                  agg 
##                                                                             "java.util.Map<java.lang.String, java.lang.String> arg0" 
##                                                                                                                                  agg 
##                                                              "org.apache.spark.sql.Column arg0, org.apache.spark.sql.Column... arg1" 
##                                                                                                                                  agg 
##                                           "org.apache.spark.sql.Column arg0, scala.collection.Seq<org.apache.spark.sql.Column> arg1" 
##                                                                                                                                  agg 
##                                                            "scala.collection.immutable.Map<java.lang.String, java.lang.String> arg0" 
##                                                                                                                                  agg 
## "scala.Tuple2<java.lang.String, java.lang.String> arg0, scala.collection.Seq<scala.Tuple2<java.lang.String, java.lang.String>> arg1"

Similarly, we can look at a SparkContext class and show some available methods that can be invoked:

scMethods <- sc %>% spark_context() %>%
  getAvailableMethods()
scMethodNames <- sort(unique(names(scMethods)))
head(scMethodNames, 60)
##  [1] "$lessinit$greater$default$3" "$lessinit$greater$default$4"
##  [3] "$lessinit$greater$default$5" "accumulable"                
##  [5] "accumulableCollection"       "accumulator"                
##  [7] "addedFiles"                  "addedJars"                  
##  [9] "addFile"                     "addJar"                     
## [11] "addSparkListener"            "applicationAttemptId"       
## [13] "applicationId"               "appName"                    
## [15] "assertNotStopped"            "binaryFiles"                
## [17] "binaryFiles$default$2"       "binaryRecords"              
## [19] "binaryRecords$default$3"     "broadcast"                  
## [21] "cancelAllJobs"               "cancelJob"                  
## [23] "cancelJobGroup"              "cancelStage"                
## [25] "checkpointDir"               "checkpointDir_$eq"          
## [27] "checkpointFile"              "clean"                      
## [29] "clean$default$2"             "cleaner"                    
## [31] "clearCallSite"               "clearJobGroup"              
## [33] "collectionAccumulator"       "conf"                       
## [35] "createSparkEnv"              "dagScheduler"               
## [37] "dagScheduler_$eq"            "defaultMinPartitions"       
## [39] "defaultParallelism"          "deployMode"                 
## [41] "doubleAccumulator"           "emptyRDD"                   
## [43] "env"                         "equals"                     
## [45] "eventLogCodec"               "eventLogDir"                
## [47] "eventLogger"                 "executorAllocationManager"  
## [49] "executorEnvs"                "executorMemory"             
## [51] "files"                       "getAllPools"                
## [53] "getCallSite"                 "getCheckpointDir"           
## [55] "getClass"                    "getConf"                    
## [57] "getExecutorIds"              "getExecutorMemoryStatus"    
## [59] "getExecutorThreadDump"       "getLocalProperties"

Using helpers to explore the methods

We can also use the helper functions to investigate more. For instance, we see that there is a getConf method avaiable to us. Looking at the object reference however does not provide useful information, so we can list the methods for that class and look for "get" methods that would show us the configuration:

spark_conf <- sc %>% spark_context() %>% invoke("conf")
spark_conf_methods <- spark_conf %>% getAvailableMethods() 
spark_conf_get_methods <- spark_conf_methods %>%
  names() %>%
  grep(pattern = "get", ., value = TRUE) %>%
  sort()
spark_conf_get_methods
##  [1] "get"                 "get"                 "get"                
##  [4] "getAll"              "getAllWithPrefix"    "getAppId"           
##  [7] "getAvroSchema"       "getBoolean"          "getClass"           
## [10] "getDeprecatedConfig" "getDouble"           "getenv"             
## [13] "getExecutorEnv"      "getInt"              "getLong"            
## [16] "getOption"           "getSizeAsBytes"      "getSizeAsBytes"     
## [19] "getSizeAsBytes"      "getSizeAsGb"         "getSizeAsGb"        
## [22] "getSizeAsKb"         "getSizeAsKb"         "getSizeAsMb"        
## [25] "getSizeAsMb"         "getTimeAsMs"         "getTimeAsMs"        
## [28] "getTimeAsSeconds"    "getTimeAsSeconds"    "getWithSubstitution"

We see that there is a getAll method that could prove useful, returning a list of tuples and taking no arguments as input:

# Returns a list of tuples, takes no arguments:
spark_conf_methods[["getAll"]]
##              returnType                  params 
## "class [Lscala.Tuple2;"                      ""
# Invoke the `getAll` method and look at part of the result
spark_confs <- spark_conf %>% invoke("getAll")
spark_confs <- vapply(spark_confs, invoke, "toString", FUN.VALUE = character(1))
sort(spark_confs)[c(2, 3, 12, 14)]
## [1] "(spark.app.name,sparklyr)"             
## [2] "(spark.driver.host,localhost)"         
## [3] "(spark.sql.catalogImplementation,hive)"
## [4] "(spark.submit.deployMode,client)"

Looking at the Scala documentation for the getAll method, we actually see that there is information missing on our data - the classes of the objects in the tuple, which in this case is scala.Tuple2<java.lang.String,java.lang.String>[].

We could therefore improve our helper to be more detailed in the return value information.

Unexported helpers provided by sparklyr

The sparklyr package itself provides facilities of nature similar to those above, looking at some of them, even though they are not exported:

sparklyr:::jobj_class(spark_conf)
## [1] "SparkConf" "Object"
sparklyr:::jobj_info(spark_conf)$class
## [1] "org.apache.spark.SparkConf"
capture.output(sparklyr:::jobj_inspect(spark_conf)) %>% head(10)
##  [1] "<jobj[1645]>"                                                                                                                   
##  [2] "  org.apache.spark.SparkConf"                                                                                                   
##  [3] "  org.apache.spark.SparkConf@7ec389e7"                                                                                          
##  [4] "Fields:"                                                                                                                        
##  [5] "<jobj[2490]>"                                                                                                                   
##  [6] "  java.lang.reflect.Field"                                                                                                      
##  [7] "  private final java.util.concurrent.ConcurrentHashMap org.apache.spark.SparkConf.org$apache$spark$SparkConf$$settings"         
##  [8] "<jobj[2491]>"                                                                                                                   
##  [9] "  java.lang.reflect.Field"                                                                                                      
## [10] "  private transient org.apache.spark.internal.config.ConfigReader org.apache.spark.SparkConf.org$apache$spark$SparkConf$$reader"

How sparklyr communicates with Spark, invoke logging

Now that we have and overview of the invoke() interface, we can take a look under the hood of sparklyr and see how it actually communicates with the Spark instance. In fact, the communication is a set of invocations that can be very different depending on which of the approches we choose for our purposes.

To obtain the information, we use the sparklyr.log.invoke property. We can choose one of the following 3 values based on our preferences:

  • TRUE will use message() to communicate short info on what is being invoked
  • "cat" will use cat() to communicate short info on what is being invoked
  • "callstack" will use message() to communicate short info on what is being invoked and the callstack

We will use TRUE in our article to keep the output short and easily manageable. First, we will close the previous connection and create a new one with the configuration containing the sparklyr.log.invoke set to TRUE, and copy in the flights dataset:

sparklyr::spark_disconnect(sc)
## NULL
config <- sparklyr::spark_config()
config$sparklyr.log.invoke <- TRUE
suppressMessages({
  sc <- sparklyr::spark_connect(master = "local", config = config)
  tbl_flights <- dplyr::copy_to(sc, nycflights13::flights, "flights")
})

Using dplyr verbs translated with dbplyr

Now that the setup is complete, we use the dplyr verb approach to retrieve the count of rows and look the invocations that this entails:

tbl_flights %>% dplyr::count()
## Invoking sql
## Invoking sql
## Invoking columns
## Invoking isStreaming
## Invoking sql
## Invoking isStreaming
## Invoking sql
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sql
## Invoking columns
## # Source: spark<?> [?? x 1]
##        n
##    <dbl>
## 1 336776

We see multiple invocations do the sql method and also the columns method. This makes sense since the dplyr verb approach actually works by translating the commands into Spark SQL via dbplyr and then sends those translated commands to Spark via that interface.

Using DBI to send queries

Similarly, we can investigate the invocations that happen when we try to retrieve the same results via the DBI interface:

DBI::dbGetQuery(sc, "SELECT count(1) AS n FROM flights")
## Invoking sql
## Invoking isStreaming
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
##        n
## 1 336776

We see slightly fewer invocations compared to the above dplyr approach, but the output is also less processed.

Using the invoke interface

Looking at the invocations that get executed using the invoke() interface:

tbl_flights %>% spark_dataframe() %>% invoke("count")
## Invoking sql
## Invoking count
## [1] 336776

We see that the amount of invocations is much lower, where the top 3 invocations come from the first part of the pipe. The invoke("count") part translated to exactly one invocation to the count method. We see therefore that the invoke() interface is indeed a more lower-level interface that invokes methods as we request them, with little to none overhead related to translations and other effects.

Redirecting the invoke logs

When running R applications that use Spark as a calculation engine, it is useful to get detailed invoke logs for debugging and diagnostic purposes. Implementing such mechanisms, we need to take into consideration how R handles the invoke logs produced by sparklyr. In simple terms, the invoke logs produced when using

  • TRUE and "callstack" are created using message(), which means they get sent to the stderr() connection by default
  • "cat" are created using cat(), so they get sent to stdout() connection by default

This info can prove useful when redirecting the log information from standard output and standard error to different logging targets.

Apache Spark and R logos

Apache Spark and R logos

Conclusion

In this part of the series, we have looked at using the Java reflection API with sparklyr’s invoke() interface to get useful insight on available methods for different object types that can be used in the context of Spark, but also in other contexts. Using invoke logging, we have also shown how the different sparklyr interfacing methods communicate with Spark under the hood.

References