Six point checklist for Spark job optimization

 

 
Photo de <a href="https://unsplash.com/@gretad3500?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText">Greta Bartolini</a> sur <a href="https://unsplash.com/fr/t/architecture-interior?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText">Unsplash</a>

Six point checklist for Spark job optimization

1. Use the correct file format

  • If you are dealing with tabular data, you should choose the Apache Parquet format. The format is optimized for queries (transformations) on a subset of columns and on a large dataframe.
  • If Spark is being used with Databricks, another particularly interesting format is the delta format which offers automatic optimization tools.

2. Maximize parallelism by splitting data using partitions 

  • To read and process data in parallel, we need to split data into multiple partitions.
  • Spark organizes one thread per task and per CPU core. Each task is related to a single partition. (Configure a number of partitions at least as large as the number of available CPU cores)
  • Goal is to split Spark job stages into number of tasks.
  • How to create partitions?

3. Handle shuffle operations

  • Shuffle partitions are special kinds of partitions which are created during stages of jobs involving a shuffle [wide transformations — join(), groupBy() etc.]
  • The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions.
  • Based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark.sql.shuffle.partitions configuration (default value: 200)
  • You can typically set it to be 1.5 or 2 times of the initial partitions or number of cores in the cluster

4. Broadcast hash joins

  • When joining tables, if one table is significantly smaller than the other, the smaller table can be duplicated in memory on all executor nodes. This can be done in two ways:
  • Broadcast small tables automatically 
    • -spark.sql.autoBroadcastHashJoin (default: 10 MB)
    • You can either increase this value or set it to -1 to force spark to broadcast.
    • Manually broadcast tables:

SELECT /*+ BROADCAST(SMALL_TABLE) */
*

FROM LARGE_TABLE

LEFT JOIN
SMALL_TABLE
ON
LARGE_TABLE.COL1 = SMALL_TABLE.COL2 ;
 
5. Cache intermediate results
Intermediate results that are being used downstream (in multiple operations) should be cached.
 
6. Manage memory of executor nodes 
Execution Memory = spark.memory.fraction * (spark.executor.memory — Reserved Memory)

spark.memory.fraction (default : 0.6) → This means 60% of the memory is reserved for executions and 40% for storage

Reserved Memory (default: 300 MB)

We can modify the below parameters to tinker with performance:

spark.executor.memory
spark.memory.fraction 
 

 Read more: on Medium

Credits: 

Abhinav Prakash, Medium

Photo de <a href="https://unsplash.com/@gretad3500?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText">Greta Bartolini</a> sur <a href="https://unsplash.com/fr/t/architecture-interior?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText">Unsplash</a>
 

Comments