# Joblib Module

The `joblib`  module provides code for parallelism.

Reminder: You should import a module *once*, usually at the beginning of the notebook.

Execute this cell to bring in Dask and all of its functions.


In [None]:
#don't forget to
import joblib
print(joblib)

## Joblib Parallel

Joblib provides a class named `Parallel` for setting up worker processes or worker threads.

In [None]:
print(joblib.Parallel)

In [None]:
workers=joblib.Parallel(n_jobs=4)
print(workers)

## Joblib Delayed

Joblib Delayed wraps a function, altering its behavior. The delayed function won't execute right away when it receives its arguments. It just saves them for later.

If the original function looks like:
```
result = my_func(args)
```
Construct a delayed function like this:
```
result_delayed = joblib.delayed(my_func)(args)
```



In [None]:
import math
sqrt_later=joblib.delayed(math.sqrt)
print(sqrt_later)

In [None]:
sqrt_4_later=sqrt_later(4)
print(sqrt_4_later)

## Loops with Joblib

A loop is called embarassingly parallel if its computations are independant.



In [None]:
many_sqrts=[]
for x in range(10):
  many_sqrts.append(math.sqrt(x))
print(many_sqrts)

You could compute the sqrt(x) in any order, as long as the final list comes out correctly sorted.

Step 1. Rewrite this as a list comprehension.

In [None]:
many_sqrts=[math.sqrt(x) for x in range(10)]
print(many_sqrts)

Step 2. Instead of computing the sqrts now, use the delayed sqrt.

In [None]:
many_sqrts_later=[sqrt_later(x) for x in range(10)]
print(many_sqrts_later)

Step 3. Feed the delayed functions to your pool of workers.

In [None]:
workers(many_sqrts_later)

All together now




In [None]:
joblib.Parallel(n_jobs=4)([joblib.delayed(math.sqrt)(x) for x in range (10)])

## Exercise

Create a parallel loop that searches a list of words and counts the number of characters in each word. Report greatest number of characters in a word.

Hints:
* `string.split()` turns a string into a list of words.
* `len(string)` returns the number of characters in a string.
* `max(list)` returns the largest element in a list.



In [None]:
#your code here

In [None]:
words=["my","list","of","words"]
max(joblib.Parallel(n_jobs=4)([joblib.delayed(len)(word) for word in words]))

# Dask Module

The `dask`  module provides code for parallelism.

Reminder: You should import a module *once*, usually at the beginning of the notebook.

Execute this cell to bring in Dask and all of its functions.


In [None]:
#don't forget to
import dask
print(dask)

# Dask Delayed

Dask provides a function named `delayed`.

In [None]:
print(dask.delayed)

Dask Delayed wraps a function, altering its behavior. Instead of executing the wrapped function, `delayed` adds the function to a structure called a graph.

If the original function looks like:
```
result = my_func(args)
```
The delayed function looks like this:
```
result = dask.delayed(my_func)(args)
```



## Dask Graph Visualization

The *result* of a `delayed` function is an object which knows which graph it is a part of.

The dask function `dask.visualize()` allows us to inspect this graph.

## Dask Computation
A dask graph will not compute its function calls until it needs to. This is called *lazy*.

You can prompt execution by calling the `.compute()` method of the result object.

## Example 1

A loops that computes the sum of the first five square numbers.

In [None]:
def square(x):
    return x * x

data = [1, 2, 3, 4, 5]
output = []

for x in data:
    a = square(x)
    output.append(a)

total = sum(output)

print(total)

Dask delayed version of this loop.

This loop computes the sum of the first five squares, but all the function calls are delayed.

 - Even the sum at the end!

In [None]:
def square(x):
    return x * x

data = [1, 2, 3, 4, 5]
output = []

dask.config.set(num_workers=4)

for x in data:
    a = dask.delayed(square)(x)
    output.append(a)

total = dask.delayed(sum)(output)

print( 'total is a', type(total) )

Visualizing the graph.

In [None]:
dask.visualize(total)

Prompting the computation to (finally) occur.

In [None]:
total = total.compute()
print(total)

## Exercise 

Create a loop that searches a list of words and counts the number of characters in each word. Report greatest number of characters in a word.

Hints:
* `string.split()` turns a string into a list of words.
* `len(string)` returns the number of characters in a string.
* `max(list)` returns the largest element in a list.

Delay each function call, and then visualize the graph.

In [None]:
#your code here

# Why choose?

Joblib is considered faster for single-node jobs where all the processes can share memory, and all the comutations can fit in memory.

Dask can scale beyond a single node and beyond computations that fit in memory.

# Parallel Arrays

Reminder: NumPy arrays

In [None]:
import numpy
my_array=numpy.asarray([1,2,3])
print(my_array)

We use array operations and array methods to perform fast computations.

In [None]:
squares=my_array**2 # array operation
total=squares.sum() # array methods
print(total)

## Joblib and Numpy Arrays

We can use joblib to operate on arrays, but sadly can't use NumPy's array methods directly

In [None]:
import numpy as np
from joblib import Parallel, delayed

def sum_row(row):
  return np.sum(row)

# Create a large NumPy array
data = np.random.rand(100, 10000)

# Parallelize the row sums using joblib
results = Parallel(n_jobs=4)(delayed(sum_row)(row) for row in data)

# Results now contains the sum of each row
results

## Dask Arrays

Dask provides an array submodule that's a wrapper around Numpy arrays.

In [None]:
import dask.array

In [None]:
print(dask.array.Array)

In [None]:
my_array = dask.array.array([1,2,3])
print(my_array)

Dask array methods are naturally delayed.

- note the `.compute()`
- it can also accept the worker arguments.

In [None]:
# Create a Dask Array
data = dask.array.random.random((100, 1000), chunks=(100, 1000))

# Calculate the sum of each row
results = data.sum(axis=1).compute(num_workers=4)

# Results now contains the sum of each row
print(results)