Creating parallel agents

March 16, 2019, 10:01 p.m.

This blog post is mostly for my own benefit, as an easily accessible set of notes that i can refer to from time to time. As such, it will not be explained in too much detail.

It is useful, for example in Reinforcement Learning, to run multiple variants of a model in parallel, that iterate through some list of values. This blog post outlines a bare minimum working example of how to approach it in python. The model (agent) is built in such a way that makes it easy to probe and understand how the parallel processing is occurring. As such it is not built as an actual usable agent. However, using a usable agent is just a matter of substituting for a different Agent class.

Setup

# #############################################
#                 IMPORTS
# #############################################
# Libraries needed for parallel processing
import multiprocessing
from multiprocessing import Pool
from functools import partial

# Libraries for creating agent
import torch
from torch import nn
from torch import Tensor
from torch.autograd import Variable
import numpy as np


# #############################################
#                  SETTINGS
# #############################################
num_cores = multiprocessing.cpu_count() # use all available CPU cores

Agent/Model

Here we will define the class for the model (agent).

class Agent(nn.Module):
    def __init__(self, w):
        super().__init__()
        self.fc1 = nn.Linear(3,1)
        self.fc1.weight.requires_grad=False
        self.set_weights({"fc1.weight": torch.tensor([w], dtype=torch.float32)})
    def forward(self, x):
        return torch.sum(self.fc1.weights)
    def score(self):
        # dummy operation to make it use up lots of CPU for some portion of time
        for i in range(12000):
            i**i
        # return a score function
        return self.fc1.weight.sum().item()
    def get_weights(self, copy=False):
        if copy:
            return {"fc1.weight": self.fc1.weight}
        else:
            return {"fc1.weight": self.fc1.weight.clone()}
    def set_weights(self, w):
        self.load_state_dict(w, strict=False)
    def jitter_weights(self, inplace=False):
        w = {"fc1.weight": self.fc1.weight if inplace else self.fc1.weight.clone()}
        step = torch.randint(3,(3,), dtype=torch.float32)-1.0
        w["fc1.weight"].add_(step)
        if not inplace:
            return w

The Parallel Processing

We will create a list of agents, as well as a list of weights we want those agents to explore.

# Create multiple agents, and a list of weights to try out in parallel
n = 6
batch_agents = [Agent([1,1,1]) for _ in range(n)]
batch_w = [{'fc1.weight': torch.tensor([[1., 1., 1.]])},
             {'fc1.weight': torch.tensor([[2., 2., 2.]])},
             {'fc1.weight': torch.tensor([[3., 3., 3.]])},
             {'fc1.weight': torch.tensor([[4., 4., 4.]])},
             {'fc1.weight': torch.tensor([[5., 5., 5.]])},
             {'fc1.weight': torch.tensor([[6., 6., 6.]])},
             ]

We can then define a function what will be executed as we iterate through the list of agents. What is important to note, is that only the first argument to this function is filled with the values we iterate through. But we want to iterate through both the agents list, and the weights list. We can get around this by packaging the two corresponding elements together and then unpackaging them inside the function.

The function does allow us to pass additional positional arguments and keyword arguments, however they can only be values that will remain fixed as we iterate.

In this example, we will just iterate through the agents and weight combinations in order to return a score value.

def process_input(aw, b=None, **kwargs):
    agent, weights = aw
    print("b: {}  **kwargs: {} w: {}".format(b, kwargs, weights["fc1.weight"].data))
    agent.set_weights(weights)
    return agent.score()

We can now iterate through the agents and weights and put them through the process_input() function.

One thing to notice is how the elements in the input array are not necessarily being executed in the same order. They are actually executed in parallel at the same time. However some parallel processes will be randomly faster than others to start executing than others. But, importantly, the outputs are guaranteed to be in the correct order.

# Run the agents in parallel, feeding in the weights
# AND returning the scores in the correct order as a list
with Pool(num_cores) as pool:
    # scores = pool.map(partial(SOME_FUNC, ARGS, KWARGS), ITERABLE)
    scores = pool.map(partial(process_input, b=45, c=33), zip(batch_agents, batch_w))
# p.close()
print("\nSCORES: ", scores)

[OUTPUT]

# b: 45  **kwargs: {'c': 33} w: tensor([[2., 2., 2.]])
# b: 45  **kwargs: {'c': 33} w: tensor([[3., 3., 3.]])
# b: 45  **kwargs: {'c': 33} w: tensor([[1., 1., 1.]])
# b: 45  **kwargs: {'c': 33} w: tensor([[5., 5., 5.]])
# b: 45  **kwargs: {'c': 33} w: tensor([[4., 4., 4.]])
# b: 45  **kwargs: {'c': 33} w: tensor([[6., 6., 6.]])
#
# SCORES:  [3.0, 6.0, 9.0, 12.0, 15.0, 18.0]

We can probe the batch of weights and agent weights to see that the agents were assigned their correct weights.

batch_w
# [{'fc1.weight': tensor([[1., 1., 1.]])},
 # {'fc1.weight': tensor([[2., 2., 2.]])},
 # {'fc1.weight': tensor([[3., 3., 3.]])},
 # {'fc1.weight': tensor([[4., 4., 4.]])},
 # {'fc1.weight': tensor([[5., 5., 5.]])},
 # {'fc1.weight': tensor([[6., 6., 6.]])}]
[agent.fc1.weight for agent in batch_agents]
# [Parameter containing:
#  tensor([[1., 1., 1.]]), Parameter containing:
#  tensor([[2., 2., 2.]]), Parameter containing:
#  tensor([[3., 3., 3.]]), Parameter containing:
#  tensor([[4., 4., 4.]]), Parameter containing:
#  tensor([[5., 5., 5.]]), Parameter containing:
#  tensor([[6., 6., 6.]])]

NOTE : This way of doing things leads to some issues if you want the agent to perform random actions. See my next blog post on how to overcome this issue.

Comments

Note you can comment without any login by:

  1. Typing your comment
  2. Selecting "sign up with Disqus"
  3. Then checking "I'd rather post as a guest"