# Large Scale Text Similarity: Broadcast and Parallelize Spark DataFrame

Although creating extract-transform-load (ETL) pipelines using Apache Spark is obvious to many of us, **computing ML specific features at scale** is still a challenge and interesting problem to explore as each business need/use case requires exploring a variety of algorithms and eventually scaling the chosen methodology.

In this post we describe the motivation and means of performing one to all app similarity using cosine similarity along with business heuristics using Spark.

Since the data was around **450K records**, processing using Python standalone code would take ages. To be precise it took ~12 hours using multi-processing and not feasible to scale to any number of records. A simple approach would be to compute these required features using a distributed processing framework.

# Feature Engineering Spark ML Pipeline

Computing cosine similarity between any two documents involves a series of steps:

- Cleaning the text — removing blank spaces, escape sequences, punctuation marks etc
- Tokenizing the text — tokenize the document into words.
- Removing the stop words — for the specific language of the text we are going to deal with, we provide list of stop words.
- Define the vector size depending on the length of the document/volume of the words the document contains.
- Create vectors (could be
`Word2Vec`

/term frequency vectors/`CountVectoriser`

vector/ HashingTF). - Compute inverse document frequency vectors.

# Cosine Similarity

Unlike Levenshtein distance, which is natively available as part of Spark `DataFrame`

functions, *Cosine Similarity* is not natively available.

Multiplying matrices or dot product provides an interesting measure called the *Cosine Similarity*. The cosine similarity is a simple similarity measurement that ranges between 0 and 1. A value of 1 indicates identical elements and a value of 0 indicates completely different elements (just like the cosine trig function does). Multiplying the matrices provides the cosine similarity between every element in list A to every element in list B. As a matter of fact, we multiply A by B.T (B.transpose) so that the dimensions fit. The interesting thing about cosine similarity between TF-IDF matrices is that the result is a matrix of similarities between every element in A to every element in B.

# The Challenge

Simply calculating the TF-IDF is feasible, even with such large datasets, on a single host (my laptop runs that easily, in a hours). However, multiplying the matrices is where the real challenge is.

Multiplying the two matrices, even that they are sparse, would mean at the very least 450K*450K operations on vector of size ~266K (if we’re smart with the zeros). Now that is a bit harder. And although numpy (which lies underneath sklearn) is very good at such fast math, this thing is a bit challenging even for numpy.

We tried that — simply multiplying these two matrices. It works well for small enough matrix size, but at some numbers (which are much smaller than what we want) it started to fail and run out of memory. Now, we could’ve worked this out by splitting one of the matrices to smaller chunks and running a number or multiplications one after the other and then summing all things up. But, this kind of reminded us that we already know that kind of system that does that, it’s called Spark.

# Spark Matrix Multiplication

Spark is great for highly parallelised memory intensive computations and lo and behold, it has a BlockMatrix data type which implements a multiply operation. Looks like exactly what we were looking for! OK, so we create the TF-IDF matrices and convert them to Spark’s BlockMatrix and run *a.multiply(b.transpose())* which is more or less what cosine_similatiry does.

This seems easy enough, and it really is. But it doesn’t scale… We’re able to multiply large matrices, but not as large as we would like to. We tried playing with the block sizes etc alas, for large enough inputs it fails with either out-of-memory errors or just long runs that never end (hours and hours).

The problem with BlockMatrix is that in order to implement the multiply operation Spark converts the sparse blocks of the matrix into *dense* (sub)matrices. And although most of our matrix is zeros, spark would still convert all these zeros into dense representation, which would either consume too much memory, or if we keep the size of the blocks small, would result in too many operations, re-partitions etc and run forever.

We tried below mentioned work arounds but none of them scaled or ran forever.

- BlockMatrix multiplication with for loop, divide one matrix to smaller chunks.
- Python UDF for dot product, with and without for loop.
- MinHashLSH, a technique for quickly estimating how similar two sets are. It did work for smaller sample but not scalable and results were not upto the mark. It has limited types of distance measures and not cosine similarity.
- tried to reduce the tf-idf vector size using hashingTF
`setNumFeatures.`

`pyspark.mllib.linalg.distributed`

to form matrix and take dot product.- sklearn and multiprocessing with for-loop. (took too long and not scalable)

# The Winner: Broadcast and Parallelize to the Rescue

Spark has two useful capabilities: *broadcast* and *parallelize*. **Broadcast** simply broadcasts the exact same data to all the workers. We use broadcast to send matrix B to all workers so that all workers have the complete B matrix. **Parallelize** chunks the data into partitions and sends each partition to a different worker. We use parallelize to send *chunks of A* to the workers so that each worker has all of B but just a small chunk of A.

The idea is to mix and match Spark with numpy. Our tests show that numpy is able to multiply a smaller matrix with a larger matrix, so if we take just a small chunk of matrix A and multiply that by matrix B, that would work and numpy would not explode.

To further improve the run time and reduce it from an hour to 25 mins for 450K app vectors, we took into consideration business input and only matched the most recent popular apps from each app category, this sampling is also known as stratified systematic sampling. Here are the detailed steps followed in the final solution:

- Calculate last day popular apps within each app category.
- Scale the popularity score between 0 to 1 for each app category.
- Convert each app description, title and category to tf-idf vectors using HashingTF and IDF after applying text pre-processing using pyspark.ml.feature and NLTK.
- Convert each app name to index. (this how it takes the input)
- Pick top N most popular apps within each category and convert to sparse matrix. (We don’t want to recommend unpopular apps and hence take only popular ones to recommend)
- Broadcast popular apps sparse matrix to each worker node.
- Pick each category apps and convert to sparse matrix.
- Parallelise each chunk of apps matrix to whole of popular apps.
- Write the results for each category similarity matching and read back to single dataframe.
- Post processing like list to rows, index to original app name. Increase spark driver memory, if required.
- The final algorithm takes into account:

- text similarity of app information using NLP and stratified systematic sampling.
- Last days popularity within the app category.
- Category bias.
- Anomaly score (in case categories defined are not logically correct).

# Conclusion

When we mix scipy sparse matrix dot product along with broadcast and parallize, we got great results in terms of relevancy and run-time. To further make it more bulletproof to any number of app similarity matching in future we constrained the second matrix to only popular apps within each category. This helped us recommend only popular match which are in trend. We also lopped the matrix multiplication for each category to popular apps to reduce the load on each worker node and run the job on just 1+2 r5.xlarge EMR cluster on AWS in 25 mins using just 48 normalised instance hours.

This is something extremely fast when compared to infinite run-time in spark.

The whole process is automated using lambda which initiates the EMR cluster, runs the job and terminates the cluster. We can further take this logic and implement in other use-cases which help us make the solution scalable, reduce run-time and optimise our cluster resources.