In this post I will be looking at a few things all combined into one script – you ‘ll see what I mean in a moment…

Being a blog about Python for finance, and having an admitted leaning towards scripting, backtesting and optimising systematic strategies I thought I would look at all three at the same time…along with the concept of “multithreading” to help speed things up.

So the script we are going to create (2 scripts in fact – one operating in a multi-threaded capacity and the other single threaded) will carry out the following steps:

1. Write the code to carry out the simulated backtest of a simple moving average strategy.

2. Run brute-force optimisation on the strategy inputs (i.e. the two moving average window periods). The Sharpe Ratio will be recorded for each run, and then the data relating to the maximum achieved Sharpe with be extracted and analysed.

3. For each optimisation run, the return and volatilty parameters of that particular backtest will then be passed to a function that runs Monte Carlo analysis and produces a distribution of possible outcomes for that particular set of inputs (I realise its a little bit of overkill to run Monte Carlo analysis on the results of each and every optimisation run, however my main goal here is to display how to multi-thread a process and the benefits that can be had in terms of code run time rather than actually analyse all the output data).

If you want to follow along with the post, the stock price data that I am using can be downloaded by clicking on the below:

It is daily price data for Ford (F.N) from the middle of 1972 onward. Once read in to a Pandas DataFrame and displayed, it should look like this:

Let’s deal first with the code to run the steps in a single threaded manner. First we import the necessary modules:

import numpy as np import pandas as pd import itertools import time

Next we quickly define a helper function to calculate annualised Sharpe Ratio for a backtest returns output:

#function to calculate Sharpe Ratio - Risk free rate element excluded for simplicity def annualised_sharpe(returns, N=252): return np.sqrt(N) * (returns.mean() / returns.std())

We then define our moving average strategy function as shown below. It takes 3 arguments, “data”, “short_ma” and “long_ma” – these should be pretty self explanatory. “data” is just the pricing data that will be passed to test the strategy over, and the other two are just the two moving average window period lengths.

def ma_strat(data,short_ma,long_ma): #create columns with MA values data['short_ma'] = np.round(data['Close'].rolling(window=short_ma).mean(),2) data['long_ma'] = np.round(data['Close'].rolling(window=long_ma).mean(),2) #create column with moving average spread differential data['short_ma-long_ma'] = data['short_ma'] - data['long_ma'] #set desired number of points as threshold for spread difference and create column containing strategy 'Stance' X = 5 data['Stance'] = np.where(data['short_ma-long_ma'] > X, 1, 0) data['Stance'] = np.where(data['short_ma-long_ma'] < -X, -1, data['Stance']) data['Stance'].value_counts() #create columns containing daily market log returns and strategy daily log returns data['Market Returns'] = np.log(data['Close'] / data['Close'].shift(1)) data['Strategy'] = data['Market Returns'] * data['Stance'].shift(1) #set strategy starting equity to 1 (i.e. 100%) and generate equity curve data['Strategy Equity'] = data['Strategy'].cumsum() #calculate Sharpe Ratio #try/except to escape case of division by zero try: sharpe = annualised_sharpe(data['Strategy']) except: sharpe = 0 return data['Strategy'].cumsum(), sharpe, data['Strategy'].mean(), data['Strategy'].std()

Next we define a third function that will carry out a MA strategy backtest and the Monte Carlo simulations for each set of window inputs that will be passed to it. It takes 3 arguments also, data, inputs and iters. “data” is the same as above, while “inputs” is the tuple of 2 window lengths from the list of window length combinations that we will create in due course. “iters” is the number of Monte Carlo simulations we want to run for each backtest optimisation result.

def monte_carlo_strat(data,inputs,iters): #set number of days for each Monte Carlo simulation days = 252 #use the current inputs to backtest the strategy and record #various results metrics perf, sharpe, mu, sigma = ma_strat(data,inputs[0],inputs[1]) #create two empty lists to store results of MC simulation mc_results = [] mc_results_final_val = [] # run the specified number of MC simulations and store relevant results for j in range(iters): daily_returns = np.random.normal(mu,sigma,days)+1 price_list = [1] for x in daily_returns: price_list.append(price_list[-1]*x) #store the individual price path for each simulation mc_results.append(price_list) #store only the ending value of each individual price path mc_results_final_val.append(price_list[-1]) return (inputs,perf, sharpe, mu,sigma,mc_results,mc_results_final_val)

Our final piece of code reads in the pricing data we want to backtest over, generates the combinations of moving average window inputs we want to test across and then runs the function for each set of windows. The process is timed and the number of seconds taken is then printed.

if __name__ == '__main__': #read in price data data = pd.read_csv('F.csv',index_col='Date',parse_dates=True) #generate our list of possible short window length inputs short_mas = np.linspace(20,50,30,dtype=int) #generate our list of possible long window length inputs long_mas = np.linspace(100,200,30,dtype=int) #generate a list of tuples containing all combinations of #long and short window length possibilities mas_combined = list(itertools.product(short_mas, long_mas)) #set required number of MC simulations per backtest optimisation iters = 2000 #create empty list to hold results results = [] #start timer start_time = time.time() #iterate through list of MA window combinations and run function for inputs in mas_combined: res = monte_carlo_strat(data,inputs,iters) results.append(res) #print number of seconds the process took print("MP--- %s seconds for single---" % (time.time() - start_time))

This results in a print out of:

MP--- 425.11910939216614 seconds for single---

That’s 7 minutes and 5 seconds, or there about to run 30 x 30 = 900 backtests, with each backtest being accompanied by a Monte Carlo simulation with 2000 iterations, with each iteration calculating 252 days worth of daily returns. That’s not bad considering the price data that was fed in for Ford stock spanned 11,820 days. That equates to the calculation of quite a few daily returns!! By my calculations it would be in the region of 464,238,000 daily returns figures produced:

backtests run = 30 x 30 = 900

daily returns calculated during backtests = 900 x 11,820 = 10,638,000

daily returns calculated during Monte Carlo simulations = 900 x 2000 x 252 = 453,600,000

So we could end there, deciding that 10 minutes of our time isn’t too much to ask to produce such a vast amount of simulated data. However…with just a bit of effort we could shave some serious time off that 10 minutes with a multi-threaded approach to the problem. A bit of juggling around with our code and adding a couple of intermediate functions and we’re nearly there!

The first thing we need to do is add some extra imports, so now our complete import code looks like the following:

import numpy as np import pandas as pd import itertools from multiprocessing.pool import ThreadPool as Pool import time

Next we define another short helper function which is going to allow us to take our list of moving average window tuples, and split it up into a series of smaller lists with a length of our choosing. This function is defined as such:

def chunk(it, size): it = iter(it) return iter(lambda: tuple(itertools.islice(it, size)), ())

Our “ma_strat()” backtesting function remains as is with no changes necessary. Our “monte_carlo_strat()” function does however need a couple of tweeks. It has to be changed so that instead of just being passed 1 set of MA window period tuples at a time, it is now passed a whole slice of the overall list containing multiple window tuple pairs (in our case each slice will hold 180 tuple pairs). This “slice” of tuple pairs will be iterated over and each one will then have a backtest run, and a Monte Carlo simulation (consisting of 2000 iteratons simulating 252 days each) carried out and the results for each tuple pair stored.

def monte_carlo_strat(data,inputs,iters): #set number of days for each Monte Carlo simulation days = 252 #iterate through the slice of the overall MA window tuples list that #has been passed to this thread for input_slice in inputs: #use the current inputs to backtest the strategy and record #various results metrics perf, sharpe, mu, sigma = ma_strat(data,input_slice[0],input_slice[1]) #create two empty lists to store results of MC simulation mc_results = [] mc_results_final_val = [] # run the specified number of MC simulations and store relevant results for j in range(iters): daily_returns = np.random.normal(mu,sigma,days)+1 price_list = [1] for x in daily_returns: price_list.append(price_list[-1]*x) #store the individual price path for each simulation mc_results.append(price_list) #store only the ending value of each individual price path mc_results_final_val.append(price_list[-1]) return (inputs, perf, sharpe, mu,sigma,mc_results,mc_results_final_val)

Next we need to create the function that’s actually going to allow us to spawn multiple threads and manage them properly. For this we draw on the “multiprocessing” module and in particular on the ThreadPool class. The function is written as follows:

def parallel_monte_carlo(data,inputs,iters): pool = Pool(5) future_res = [pool.apply_async(monte_carlo_strat, args=(data,inputs[i],iters)) for i in range(len(inputs))] samples = [f.get() for f in future_res] return samples

This function spawns 5 threads and uses them to run the function concurrently across chunks of the input list, i.e. the list of moving average window tuples. The results are stored in the “future_res” variable which in turn have to extracted into a list using the “.get()” method in a list comprehension. The sample results are then returned by the function.

There is only one more block of code we need to add now – the equivilant to the end block in our last “single thread” example. This time it’s slightly different though…

if __name__ == '__main__': #read in price data data = pd.read_csv('F.csv',index_col='Date',parse_dates=True) #generate our list of possible short window length inputs short_mas = np.linspace(20,50,30,dtype=int) #generate our list of possible long window length inputs long_mas = np.linspace(100,200,30,dtype=int) #generate a list of tuples containing all combinations of #long and short window length possibilities mas_combined = list(itertools.product(short_mas, long_mas)) #use our helper function to split the moving average tuples list #into slices of length 180 mas_combined_split = list(chunk(mas_combined, 180)) #set requried number of MC simulations per backtest optimisation iters = 2000 #start timer start_time = time.time() #call our multi-threaded function results = parallel_monte_carlo(data,mas_combined_split,iters) #print number of seconds the process took print("MP--- %s seconds for para---" % (time.time() - start_time))

This results in a print out of:

MP--- 268.89529037475586 seconds for para---

That’s not too bad! The time taken has dropped from 7 minutes and 5 seconds to just under 4 minutes and 30 seconds.

Its worth noting that this reduction in code run time isn’t a given; its reasonably specific to the choice of inputs we entered. There is a trade off in terms of the speed up you achieve by running threads concurrently, and the time taken to spawn those threads and managing them in the background etc. (I wouldn’t profess to have anything but the most basic undertanding of what’s actually going on behind the scenes from a technical perspective). But I do know that trade off exists…the best speed up is going to be achieved in a situation where the process running on each thread is the main time sink of the overall process. If the process that is passed to a new thread only takes a relatively short time to run, then the benefit you get will be outweighed by the time taken to spawn that thread in the first place.

So have a play around with the inputs and see what kind of results you get…if you find anything particularly noteworthy, feel free to leave a comment below…

I will leave the task of extracting and analysing the results to you all for the moment, see if you can get a bit of practice in. I may tackle that myself for the next blog post.

Until next time!