Apache Spark 3.0 ( available in Databricks Runtime 7.0)
- neovijayk
- Jul 7, 2020
- 4 min read
Here are the biggest new features in Spark 3.0:
2x performance improvement on TPC-DS over Spark 2.4, enabled by adaptive query execution, dynamic partition pruning and other optimizations
ANSI SQL compliance
Significant improvements in pandas APIs, including Python type hints and additional pandas UDFs
Better Python error handling, simplifying PySpark exceptions
New UI for structured streaming
Up to 40x speedups for calling R user-defined functions
No major code changes are required to adopt this version of Apache Spark
Improved Spark SQL Engine ( both perfromance and ANSI compatability.
Spark 3.0 performed roughly 2x better than Spark 2.4 in total runtime.
The new Adaptive Query Execution(AQE) framework improves performance.
The new Adaptive Query Execution (AQE) framework improves performance and simplifies tuning by generating a better execution plan at runtime, even if the initial plan is suboptimal due to absent/inaccurate data statistics and misestimated costs. Because of the storage and compute separation in Spark, data arrival can be unpredictable. For all these reasons, runtime adaptivity becomes more critical for Spark than for traditional systems. This release introduces three major adaptive optimizations:
Dynamically coalescing shuffle partitions simplifies or even avoids tuning the number of shuffle partitions. Users can set a relatively large number of shuffle partitions at the beginning, and AQE can then combine adjacent small partitions into larger ones at runtime.
Dynamically switching join strategies partially avoids executing suboptimal plans due to missing statistics and/or size misestimation. This adaptive optimization can automatically convert sort-merge join to broadcast-hash join at runtime, further simplifying tuning and improving performance.
Dynamically optimizing skew joins is another critical performance enhancement, since skew joins can lead to an extreme imbalance of work and severely downgrade performance. After AQE detects any skew from the shuffle file statistics, it can split the skew partitions into smaller ones and join them with the corresponding partitions from the other side. This optimization can parallelize skew processing and achieve better overall performance.
Dynamic Partition Pruning is applied when the optimizer is unable to identify at compile time the partitions it can skip. This is not uncommon in star schemas, which consist of one or multiple fact tables referencing any number of dimension tables. In such join operations, we can prune the partitions the join reads from a fact table, by identifying those partitions that result from filtering the dimension tables. In a TPC-DS benchmark, 60 out of 102 queries show a significant speedup between 2x and 18x.
ANSI SQL compliance is critical for workload migration from other SQL engines to Spark SQL. To improve compliance, this release switches to Proleptic Gregorian calendar and also enables users to forbid using the reserved keywords of ANSI SQL as identifiers. Additionally, we’ve introduced runtime overflow checking in numeric operations and compile-time type enforcement when inserting data into a table with a predefined schema. These new validations improve data quality.
Join hints: While we continue to improve the compiler, there’s no guarantee that the compiler can always make the optimal decision in every situation — join algorithm selection is based on statistics and heuristics. When the compiler is unable to make the best choice, users can use join hints to influence the optimizer to choose a better plan. This release extends the existing join hints by adding new hints: SHUFFLE_MERGE, SHUFFLE_HASH and SHUFFLE_REPLICATE_NL.
Spark 3.0 brings several enhancements to the PySpark APIs:
New pandas APIs with type hints: pandas UDFs were initially introduced in Spark 2.3 for scaling user-defined functions in PySpark and integrating pandas APIs into PySpark applications. However, the existing interface is difficult to understand when more UDF types are added. This release introduces a new pandas UDF interface that leverages Python type hints to address the proliferation of pandas UDF types. The new interface becomes more Pythonic and self-descriptive.
New types of pandas UDFs and pandas function APIs: This release adds two new pandas UDF types, iterator of series to iterator of series and iterator of multiple series to iterator of series. It’s useful for data prefetching and expensive initialization. Also, two new pandas-function APIs, map and co-grouped map are added. More details are available in this blog post.
Better error handling: PySpark error handling is not always friendly to Python users. This release simplifies PySpark exceptions, hides the unnecessary JVM stack trace, and makes them more Pythonic.
Hydrogen, streaming and extensibility
With Spark 3.0, we’ve finished key components for Project Hydrogen as well as introduced new capabilities to improve streaming and extensibility.
Accelerator-aware scheduling: Project Hydrogen is a major Spark initiative to better unify deep learning and data processing on Spark. GPUs and other accelerators have been widely used for accelerating deep learning workloads. To make Spark take advantage of hardware accelerators on target platforms, this release enhances the existing scheduler to make the cluster manager accelerator-aware. Users can specify accelerators via configuration with the help of a discovery script. Users can then call the new RDD APIs to leverage these accelerators.
New UI for structured streaming: Structured streaming was initially introduced in Spark 2.0. After 4x YoY growth in usage on Databricks, more than 5 trillion records per day are processed on Databricks with structured streaming. This release adds a dedicated new Spark UI for inspection of these streaming jobs. This new UI offers two sets of statistics: 1) aggregate information of streaming query jobs completed and 2) detailed statistics information about streaming queries.
Observable metrics: Continuously monitoring changes to data quality is a highly desirable feature for managing data pipelines. This release introduces monitoring for both batch and streaming applications. Observable metrics are arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (e.g., finishes batch query or reaches streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.
New catalog plug-in API: The existing data source API lacks the ability to access and manipulate the metadata of external data sources. This release enriches the data source V2 API and introduces the new catalog plug-in API. For external data sources that implement both catalog plug-in API and data source V2 API, users can directly manipulate both data and metadata of external tables via multipart identifiers, after the corresponding external catalog is registered.
Comentarios