Why move to spark?
Although we all talk about Big Data, it usually takes some time in your career until you encounter it. For me at Wix.com it came quicker than I thought, having well over 160M users generates a lot of data — and with that comes the need for scaling our data processes.
While there are other options out there (Dask for example), we decided to go with Spark for 2 main reasons — (1) It’s the current state of the art and widely used for Big Data. (2) We had the infrastructure needed for Spark in place.
How to write in PySpark for pandas people
Chances are you’re familiar with pandas, and when I say familiar I mean fluent, your mother's tongue :)
The headline of the following talk says it all — Data Wrangling with PySpark for Data Scientists Who Know Pandas and it’s a great one.
This will be a very good time to note that simply getting the syntax right might be a good place to start but you need a lot more for a successful PySpark project, you need to understand how spark works.
It’s hard to get Spark to work properly, but when it works — it works great!
Spark in a nutshell
I would only go knee deep here but I recommend visiting the following article and reading the MapReduce explanation for a more extensive explanation — The Hitchhikers guide to handle Big Data using Spark.
The concept we want to understand here is Horizontal Scaling.
It’s easier to start with Vertical Scaling. If we have a pandas code that works great but then the data becomes too big for it, we can potentially move to a stronger machine with more memory and hope it manages. This means we still have one machine handling the entire data at the same time - we scaled vertically.
If instead we decided to use MapReduce, and split the data to chunks and let different machines handle each chunk — we’re scaling horizontally.
5 Spark Best Practices
These are the 5 spark best practices that helped me reduce runtime by 10x and scale our project.
1 - Start small — Sample the data
If we want to make big data work, we first want to see we’re in the right direction using a small chunk of data. In my project I sampled 10% of the data and made sure the pipelines work properly, this allowed me to use the SQL section in the Spark UI and see the numbers grow through the entire flow, while not waiting too long for the process to run.
From my experience if you reach your desired runtime with the small sample, you can usually scale up rather easily.
2 - Understand the basics — Tasks, partitions, cores
This is probably the single most important thing to understand when working with spark:
1 Partition makes for 1 Task that runs on 1 Core
You have to always be aware of the number of partitions you have - follow the number of tasks in each stage and match them with the correct number of cores in your spark connection. A few tips and rules of thumb to help you do this (all of them require testing with your case):
- The ratio between tasks and cores should be around 2–4 tasks for each core.
- The size of each partition should be about 200MB–400MB, this depends on the memory of each worker, tune it to your needs.
3 - Debugging spark
Spark works with lazy evaluation, which means it waits until an action is called before executing the graph of computation instructions. Examples of actions are show(), count(),...
This makes it very hard to understand where are the bugs / places that need optimization in our code. One practice which I found helpful was splitting the code to sections by using df.cache() and then using df.count() to force spark to compute the df at each section.
Now, using the spark UI you can look at the computation of each section and spot the problems. It’s important to note that using this practice without using the sampling we mentioned in (1) will probably create a very long runtime which will be hard to debug.
4 - Finding and solving skewness
Let’s start with defining skewness. As we mentioned our data is divided to partitions and along the transformations the size of each partition would likely change. This can create a wide variation in size between partitions which means we have a skewness in our data.
Finding the Skewness can be done by looking at the stage details in the spark UI and looking for a significant difference between the max and median:
The big variance (Median=3s, Max=7.5min) might suggest a skewness in data
This means that we have a few tasks that were significantly slower than the others.
Why is this bad — this might cause other stages to wait for these few tasks and leave cores waiting while not doing anything.
Preferably if you know where the skewness is coming from you can address it directly and change the partitioning. If you have no idea / no option to solve it directly, try the following:
Adjusting the ratio between the tasks and cores
As we mentioned, by having more tasks than cores we hope that while the longer task is running other cores will remain busy with the other tasks. Although this is true, the ratio mentioned earlier (2-4:1) can’t really address such a big variance between tasks' duration. We can try to increase the ratio to 10:1 and see if it helps, but there could be other downsides to this approach.
Salting the data
Salting is repartitioning the data with a random key so that the new partitions would be balanced. Here’s a code example for pyspark (using groupby which is the usual suspect for causing skewness):
5 - Problems with iterative code in spark
This one was a real tough one. As we mentioned spark uses lazy evaluation, so when running the code — it only builds a computational graph, a DAG. But this method can be very problematic when you have an iterative process, because the DAG reopens the previous iteration and becomes very big, I mean very very big . This might be too big for the driver to keep in memory. This problem is hard to locate because the application is stuck, but it appears in the spark UI as if no job is running (which is true) for a long time — until the driver eventually crashes.
This is currently an inherent problem with spark and the workaround which worked for me was using df.checkpoint() / df.localCheckpoint() every 5–6 iterations (find your number by experimenting a bit). The reason this works is that checkpoint() is breaking the lineage and the DAG (unlike cache()), saving the results and starting from the new checkpoint. The downside is that if something bad happened, you don’t have the entire DAG for recreating the df.
As I said before, it takes time to learn how to make spark do its magic but these 5 practices really pushed my project forward and sprinkled some spark magic on my code.
To conclude, this is the post I was looking for (and didn’t find) when I started my project — I hope you found it just in time.
- Data Wrangling with PySpark for Data Scientists Who Know Pandas
- The Hitchhikers guide to handle Big Data using Spark
- Spark: The Definitive Guide — chapter 18 about monitoring and debugging is amazing.