Skip to content

Equality of opportunity in supervised learning

May 7, 2018

Equality of opportunity in supervised learning Hardt et al., NIPS’16

With thanks to Rob Harrop for highlighting this paper to me.

There is a a lot of concern about discrimination and bias entering our machine learning models. Today’s paper choice introduces two notions of fairness: equalised odds, and equalised opportunity, and shows how to construct predictors that are fair under these criteria. One very appealing feature of the model is that in the case of uncertainty caused by under-representation in the training data, the cost of less accurate decision making in that demographic is moved from the protected class (who might otherwise for example not be offered loans), to the decision maker. I’m going to approach the paper backwards, and start with the case study, as I find a motivating example really helps with the intuition.

Loans, race, and FICO scores

We examine various fairness measures in the context of FICO scores with the protected attribute of race. FICO scores are a proprietary classifier widely used in the United States to predict credit worthiness. Our FICO data is based on a sample of 301,536 TransUnion TransRisk scores from 2003.

We’re interesting in comparing scores, the risk of defaulting on a loan, and race. In the dataset, race is restricted to four values: Asian, white non-Hispanic (labeled ‘white’ in figures), Hispanic, and black. The scores are the results of complicated proprietary classifiers. But are they fair? And how should we think about fairness? A threshold score of 620 is commonly used for prime-rate loans, and that corresponds to an any account default rate of 18% (that is, 82% do not default).

Here are the marginal distributions for each group:

A max profit model with no fairness constraints at all picks a different threshold for each group, such that 82% of the people in that group do not default. It’s easy to see for example that this makes it less likely for a black person to get a loan than for an Asian person.

Next we might try a race blind model. This model requires the threshold to be the same for every group, so it picks a single threshold at which 82% of people do not default overall. Such a model would extract 99.3% of the profit available under the max profit model. This sounds good on the surface, but when you dig into it and examine per-group what it means to reach that single cut-off, it becomes readily apparent that it is not actually “fair” and disadvantages blacks and Hispanics vs whites and Asians:

When race is redundantly encoded (i.e., it can be inferred from other variables), then race blindness degenerates into max profit.

Moving on, a popular approach to fixing the issues with the race blind is demographic parity. Under demographic parity we go back to choosing a different threshold for each group, but instead of choosing that threshold based on likelihood of default, we choose it such that the fraction of group members that qualify for loans is the same across all groups. That sounds good initially too…

Unfortunately, as was already argued by Dwork et al., the notion (of demographic parity) is seriously flawed on two counts. First, it doesn’t ensure fairness. Indeed, the notion permits that we accept qualified applications in the demographic A = 0, but unqualified individuals in A = 1, so long as the percentages of acceptance match. This behaviour can arise naturally when there is little or no training data available within A = 1. Second, demographic parity often cripples the utility that we hope to achieve.

(In the above paragraph, it’s assumed that the attribute A takes on a binary value, but it’s easy to generalise to the categorical case).

In the FICO dataset we’re looking at, using demographic parity obtains only 69.8% of profit available under the max profit model, and actually ends up reversing the bias such that e.g. white people that would not default have a significantly harder time qualifying for loans.

So now we arrive at the first of the two models introduced in the paper, equal opportunity. Equal opportunity requires non-discrimination only within the ‘advantaged’ outcome group. In this case, it says that people who pay back their loan, have an equal opportunity of getting the loan in the first place. For people that will end up defaulting, we do not require equal opportunity of getting a loan in the first place. To achieve equal opportunity, we pick per-group thresholds such that the fraction of non-defaulting group members that qualify for loans is the same across all groups. Equal opportunity does much better than demographic parity, extracting 92.8% of the potential profit available under the max profit model.

Even stronger than equal opportunity is equalized odds.

Equalized odds requires both the fraction of non-defaulters that qualify for loans and the fraction of defaulters that qualify for loans to be constant across groups. This cannot be achieved with a single threshold for each group, but requires randomization. There are many ways to do it; here we pick two thresholds for each group, so above both thresholds people always qualify, and between the thresholds people qualify with some probability.

Equalized odds achieves 80.2% of the potential profits available under the max profits model. The difference comes down to the fact that under equal opportunity the classifier can make use of its better accuracy among whites, but under equal odds this is viewed as unfair, since white people who wouldn’t pay their loans have a harder time getting them than minorities who wouldn’t pay their loans. “An equal odds classifier must classify everyone as poorly as the hardest group, which is why it costs over twice as much in this case.”

The following charts show visually the performance of the five different models discussed so far in terms of the fraction of non-defaulters that can successfully obtain loans, and the fraction of the maximum profit that is achieved.

You can also see how the FICO score thresholds vary across models:

Formalising equal odds and equal opportunity

Let X be the set of available features as input to a model, and A be the protected attribute (e.g., race in the example above). Y is the true outcome (i.e., the labels in our labeled dataset), \widehat{Y} is the predictor which predicts a value of Y given X. We’ll start with the case where Y is a binary decision and A is a binary attribute.

The notion we propose is “oblivious,” in that it is based only on the joint distribution, or joint statistics, of the true target Y, the predictions \widehat{Y}, and the protected attribute A. In particular, it does not evaluate the features in X nor the functional form of the predictor \widehat{Y}(A) nor how it was derived.

A predictor \widehat{Y} satisifies equalized odds with respect to a protected attribute A and outcome Y if \widehat{Y} and A are independent conditional on Y. Which is equivalent to saying:

\displaystyle P\{\widehat{Y} = 1 | A = 0, Y = y\} = P\{\widehat{Y} = 1 | A = 1, Y = y\}, y \in \{0,1\}

For the outcome y=1, \widehat{Y} will have equal true positive rates across the two demographics A=0 and A=1, and for y=0 it will have equal false positive rates.

Equalized odds enforces that the accuracy is equally high in all demographics, punishing models that perform well only on the majority.

For equal opportunity we relax the condition that odds must be equal in the case that Y=0. We can think of this as saying that every demographic has equal opportunity to participate when Y = 1. A binary predictor satisfies equal opportunity with respect to A and Y if :
\displaystyle P\{\widehat{Y} = 1 | A = 0, Y = 1 \} = P\{\widehat{Y} = 1 | A = 1, Y = 1\}

Equal opportunity typically allows for stronger utility than equalized odds, as we saw in the FICO example.

Equalisation as a post-processing step

One nice characteristic of these models is that you can start by learning a possibly discriminator learned binary predictor \widehat{Y} (or score R), and then derive an equalized odds or equal opportunity predictor \widetilde{Y} from it. So we can keep an existing training pipeline untouched, and add an anti-discriminatory step on the back-end of it.

Section 4 in the paper shows how to do this for a binary predictor, and the following figure gives a geometrical interpretation of the method:

Section 4.2 shows how to extend the method to work with a thresholded score function (as in the FICO example).

See the full paper for details of the methods.


Section 6 in the paper points out that it is possible for two scenarios to be indistinguishable from their joint distribution, and yet have fundamentally different interpretations from the point of view of fairness. No oblivious test can resolve which of the two scenarios applies.

We envision our framework as providing a reasonable way of discovering and measuring potential concerns that require further scrutiny. We believe that resolving fairness concerns is ultimately impossible without substantial domain-specific investigation.


Requiring equalized odds aligns incentives such that the entity building the predictor is motivated to achieve fairness. Getting better predictions under these conditions requires collecting features that more directly capture the target Y, unrelated to its correlation with the protected attribute.

In some situations… the equalized odds predictor can be thought of as introducing some sort of affirmative action: the optimally predictive score R* is shifted based on A. This shift compensates for the fact that, due to uncertainty, the score is in a sense more biased than the target label (roughly, R* is more correlated with A than Y is correlated with A). Informally speaking, our approach transfers the burden of uncertainty from the protected class to the decision maker.

Performance analysis of cloud applications

May 4, 2018

Performance analysis of cloud applications Ardelean et al., NSDI’18

Today’s choice gives us an insight into how Google measure and analyse the performance of large user-facing services such as Gmail (from which most of the data in the paper is taken). It’s a paper in two halves. The first part of the paper demonstrates through an analysis of traffic and load patterns why the only real way to analyse production performance is using live production systems. The second part of the paper shares two techniques that Google use for doing so: coordinated bursty tracing and vertical context injection.

(Un)predictable load

Let’s start out just by consider Gmail requests explicitly generated by users (called ‘user visible requests,’ or UVRs, in the paper). These are requests generated by mail clients due to clicking on messages, sending messages, and background syncing (e.g., IMAP).

You can see a clear diurnal cycle here, with the highest QPS when both North America and Europe are active in the early morning, and lower QPS at weekends. (All charts are rescaled using some unknown factor, to protect Google information).

Request response sizes vary by about 20% over time. Two contributing factors are bulk mail senders, sending bursts of email at particular times of the day, and IMAP synchronisation cycles.

In summary, even if we consider only UVR, both the queries per second and mix of requests changes hour to hour and day to day.

If we look at the total load, we see it’s significantly less predictable than the UVR patterns would lead us to believe.

Drilling into request response size, we see that it varies by a factor of two over the course of the week and from hour to hour. The actual mix of requests to the system (and not just the count of requests) changes continuously.

There are multiple reasons for this: system tasks continuously validating data integrity and recovering from failures; the rollout of software updates; repairs due to bugs (e.g., reindexing messages), and underlying data management tasks such as Bigtable compaction. Combining these non-UVR tasks with UVR load in one chart reveals these non-UVR tasks have a significant impact on CPU usage, with UVR work directly consuming only about 20% of CPU.

Then of course, there are the one-off events, such as the time that Google’s Belgian datacenter was struck by lightning four times, potentially corrupting data on some disks. Reconstructing it and rebalancing requests while that went on caused quite a bit of disruption!

Limitations of synthetic workloads

The superimposition of UVR load, non-UVR load, and one-off events results in a complex distribution of latency and resource usage, often with a long tail. When confronted with such long-tail performance puzzles, we first try to reproduce them with synthetic users…

Despite best efforts at modelling though, the synthetic test environment often shows different behaviours to production. It can serve as a general guide though: the relationships uncovered in synthetic testing are often valid, if not the magnitude of the effect. I.e., if an optimization improves latency of an operation with synthetic users it will often also do so for real users, but by a varying amount.

For most subtle changes though, we must run experiments in a live system serving real users.

High level methodology

Users are partitioned (randomly), with one partition as the test and the other as the control. It turns out that we need large samples! Here’s a A/A test chart with two 100K user partitions – we expect them to have identical latency distributions of course. By the 95th percentile latencies differ by up to 50% and often by at least 15%.

Only in an A/A test with partitions containing tens of millions of users do we find partition results that are largely indistinguishable from each other.

Given that many fields routinely derive statistics from fewer than 100K observations, why do our samples differ by up to 50% at some points in time? The diversity of our user population is responsible for this: Gmail users exhibit a vast spread of activity and mailbox sizes with a long tail.

As a result of this, tests are done in a live application settings with each experiment involving millions of users. Longitudinal studies take place over a week or more to capture varying load mixes. Comparisons between test and control are primarily done at the same points in time. Statistics are used to analyse the data and predict the outcome of risky experiments and to corroborate the results with other data.

Statistics: handle with care

The choice of statistical methods is not always obvious: most methods make assumptions about the data that they analyze (e.g., the distribution or independence of the data)… any time we violate the assumptions of a statistical method, the method may produce misleading results.

Here’s an example when it worked: modelling the predicted CPU demands when doubling the number of users. The distribution of CPU usage for N processes exhibits a near normal distribution. Thus in theory, the properties of 2N processes could be calculated using the standard methods for adding normal distributions (to add a distribution to itself, double the mean and take the square route of the standard deviation). The predicted CPU usage matched well with actuals (within 2%) at the 50th and 95th percentiles, but underestimate at the 99th. With the exception of this tail, the model worked well.

Another experiment tried to model what would happen when co-locating pairs of communicating processes. This time the predictions are all over the map compared to the results:

The CPU usage of the two processes is not independent. They exhibit time-shifted CPU usage when the higher-level process makes an RPC call to the lower-level process, and then waits for a response. Because the two distributions are not independent they can’t be added together.

Time and motion

Before we can determine what changes we need for a cloud application to become more responsive or cost effective, we need to understand why a request to a cloud application is slow or expensive. To achieve that understanding we need to consider two contexts…

  • The temporal context of a request tells you what else was going on at the time.
  • The operation context tells you how a single request flows through many processes and software layers. The operation context helps to tease apart the differences between generally slow calls, calls that are slow with certain arguments, and calls that are individually fast but large numbers of them add up to a slow request.

On your marks, get set, trace…

The obvious approach for using sampling yet getting the temporal context is to use bursty tracing. Rather than collecting a single event at each sample, bursty-tracing collects a burst of events.

We need to capture events across processes / layers at the same point in time for this to work, which requires some kind of coordination so that everyone knows when to stop and start tracing. Google have a very pragmatic approach to this which avoids the need for any central coordination – just use the wall clock time! A burst config determines the on-and-off periods for tracing (e.g. trace for 4ms every 32ms). Bursts last for 2^n ms, with one burst every 2^(n+m) ms. This can be encoded as m 1s followed by n 0s in binary. Each process then performs tracing whenever burst-config & WallTimeMillis() == burst-config, which it can determine entirely locally.

This scheme does require aligned wall clocks across machines, but of course Google have true time for that!

We use coordinated bursty tracing whenever we need to combine traces from different layers to solve a performance mystery.

System call steganography

To get the operation context, we need to track spans within and across processes. “Simply interleaving traces based on timestamps or writing to the same log file is not enough to get a holistic trace.” The simplest way from the perspective of reconstructing traces is to modify the code to propagate an operation context through all layers and tag all events with it. This can be intrusive and require non-trivial logic though.

“I have a cunning plan, my lord.”

Our approach relies on the insight that any layer of the software stack can directly cause kernel-level events by making system calls. By making a stylized sequence of innocuous system calls, any layer can actually inject information into the kernel traces.

For example, to encode ‘start of an RPC’ into a kernel trace we can call syscall(getpid, kStartRpc); syscall(getpid, RpcId);. The kernel traces the arguments passed to gitpid, even though getpid ignores them. Two getpid calls are unlikely to naturally occur like this back to back. If kStartRpc is the code for an RPC start, and RpcId is an identifier for the rpc in question, we can look for this pattern in the kernel trace and recover the information. It took less than 100 lines of code it implement RPC and lock-injection information into kernel traces using this scheme.

We use vertical context injection as a last resort: its strength is that it provides detailed information that frequently enables us to get to the bottom of whatever performance mystery we are chasing; its weakness is that these detailed traces are large and understanding them is complex.

The strategies described in the paper have been used for analyzing and optimizing various large applications at Google including Gmail (more than 1 billion users) and Drive (hundreds of millions of users).

Stateless datacenter load-balancing with Beamer

May 3, 2018

Stateless datacenter load-balancing with Beamer Olteanu et al., NSDI’18

We’ve spent the last couple of days looking at datacenter network infrastructure, but we didn’t touch on the topic of load balancing. For a single TCP connection, you want all of the packets to end up at the same destination. Logically, a load balancer (a.k.a. ‘mux’) needs to keep some state somewhere to remember the mapping.

Existing load balancer solutions can load balance TCP and UDP traffic at datacenter scale at different price points. However, they all keep per-flow state; after a load balancer decides which server should handle a connection, that decision is “remembered” locally and used to handle future packets of the same connection. Keeping per-flow state should ensure that ongoing connections do not break when servers and muxes come or go…

There are two issues with keeping this state though. Firstly , it can sometimes end up incomplete or out of date (especially under periods of rapid network change, such as during scale out and scale in). Secondly, there’s only a finite amount of resource to back that state, which opens the door to denial of service attacks such as SYN flood attacks.

Beamer is a stateless datacenter load balancer supporting both TCP and Multipath TCP (MPTCP). It manages to keep the load balancers stateless by taking advantage of connection state already held by servers.

Our prototype implementation can forward 33 million minimum-sized packets per second on a ten core server, twice as fast as Maglev, the state of the art load balancer for TCP traffic. Our stateless design allows us to cheaply run Beamer in hardware too…

You can find Beamer at

Datacenter load balancing and problems of state

A typical load balancing deployment looks like this:

A datacenter border router is the door to the outside world. Muxes communicate with the border router using BGP, announcing the virtual IPs (VIPs) they are responsible for. A VIP is the outward facing IP address of a datacenter service. The border router uses equal-cost multipath routing (ECMP) to split the traffic equally across the muxes. Behind the VIP at a mux sits a pool of servers that actually provide the service. These destination servers have their own private addresses called direct IPs or DIPs. The mux chooses destination servers for new connections based on a hash of the five-tupleª. Changing the number of servers in the DIP pool means that at least some assignments of five-tuple hashes to DIPs will change (including those for existing connections). For this reason, once a mapping of connection to to DIP is chosen it is stored locally to ensure all future packets go to the same DIP.

a: (source ip, source port, destination ip, destination port, protocol)

The mux encapsulates the original packet and sends it on to the DIP. At the receiving server the packet is extracted, the original destination address changed from the VIP to the DIP, and it is then processed by the regular TCP stack. Any replies bypass the mux: for reply packets the source address is changed from the server’s DIP to the service’s VIP and the packet is sent directly to the client (aka Direct Source Return, DSR).

Because replies bypass the muxes, it is possible for the view of the connection state to differ between a destination server and a mux. For example, if a client disconnects after sending a SYN packet the mux will remember the mapping for minutes, but the chosen back-end server will terminate the connection after a few failed attempts at establishing it. SYN-flood attacks exploit this by deliberating creating large numbers of half-open connections until resource is exhausted. Defending against SYN-flood attacks requires keeping even more state at the muxes (SYN cookies), and/or terminating TCP at the mux. Both solutions limit scalability.

Scale out events which add more muxes and back-end servers can also cause problems. When we add a mux, the border router will start sending it traffic. If some of that traffic is for existing connections, the new mux won’t have any saved connection state for them. The 5-tuple hash would choose the same destination IP as the original mux though, so long as the number of backend servers is not changing at the same time. Which it usually is.

Beamer’s secret sauce: stable hashing and daisy-chaining

Consider a scale-out event as we just discussed, in which we start out with mux 1 and destination server A, and add mux 2 and destination server B.

Traffic for an existing connection terminated at A might now start following the blue path (caused by ECMP balancing and different hash results inside mux 2 due to the change in size of the DIP pool). Normally, when B receives a packet for a connection it doesn’t know about, it would just reset the connection. In Beamer, B does not reset the connection, instead it forwards packets it doesn’t have state for to A, where they can be processed normally. (How B knows to send them to A we’ll come to shortly). This forwarding between backend servers is called daisy chaining and is the core idea behind Beamer. During times of stability, we won’t need to avail ourselves of the daisy chaining facility, but under change we can use it to keep things running without interruption. Since the load balancer was only retaining state to keep consistent routing in the face of change, we now don’t need state in the load balancer!

There are three key parts to Beamer:

  1. A stable hashing algorithm that reduces the amount of churn caused by DIP pool changes. This ensures we don’t need to daisy chain very often, keeping performance high.
  2. An in-band signalling mechanism that gives servers enough information to do daisy chaining when they need to.
  3. A fault-tolerant control plane that scalably disseminates data plane configurations to all mutexes.

Stable hashing

A simple hashing algorithm would be to use hash(5tuple) % N where N is the number of DIPs. This causes a lot of churn in assignments when N changes though. Consistent hashing, rendezvous hashing, and Maglev hashing all combine good load balancing with reduced disruption under churn. A downside is that muxes end up with lots of matching rules, reducing performance.

Beamer’s answer is a layer of indirection called stable hashing. Before load balancing starts for a given VIP the operators determines a fixed number of buckets B, such that B is larger than the maximum number of expected DIPs in the pool (e.g., B = 100N). Each bucket is assigned to a single server at a time, a server can be responsible for multiple buckets simultaneously. The number of buckets and bucket to server assignments are known by all muxes (yes, that’s state!! ) and disseminated via a separate control plane mechanism. Strictly then, Beamer muxes are not stateless, but critically they don’t maintain any per-flow state.

When a packet arrives, muxes hash it to a bucket by computing b=hash(5tuple) % B and then forward the packet to the server currently assigned bucket B. As B is constant by construction, server churn does not affect the mapping result: a connection always hashes to the same bucket regardless of the number of active DIPs.

Changing bucket-to-server mappings are stored in a coordination service (Apache ZooKeeper in the implementation), and muxes retrieve the latest version before serving traffic. Changes to bucket-to-DIP mappings are rare, so the mechanism has low coordination overhead.

Bucket-to-server mappings can be managed using consistent hashing or similar. Beamer uses a greedy assignment algorithm that aims to maximise contiguous bucket ranges assigned to muxes, which reduces the number of rules needed and is especially useful for hardware deployments.

Daisy chaining

Daisy chaining is used to cope with the period when a bucket-to-server assignment has changed, but there are still in-flight connections with the old destination server. On reconfiguration, a mux saves the previous DIP for each bucket along with the time the reallocation took place. Encapsulated packets from the mux to the (new) DIP carry this information with them.

When a packet arrives at a backend server it is processed locally if it is a SYN, a valid SYN-cookie ACK, or it belongs to a local connection. Otherwise, the server inspects the last-bucket-change timestamp in the packet. If it is within the daisy chaining timeout (a hard limit of four minutes in the implementation) then the previous DIP is extracted and packet is forwarded. If we are outside of the time limit the packet is dropped and a RST is sent back to the source.

There’s one more problem that can occur when rolling out an update across muxes. Suppose we start with a configuration including only mux 1, and then mux 2 comes online and a bucket reconfiguration is rolled out. A connection is routed via mux 2 (which has the new configuration) to backend server A. If via ECMP subsequent packets end up being sent to mux 1, before mux 1 has switched to the new configuration, then mux 1 will know nothing about the new bucket-to-server mapping and may well route the packet to a different backend without the correct daisy chaining information. To solve this an epoch (generation) number is also included in the packets.

When (a backend server) receives a mid-connection packet that can not be daisy chained and for which it has no state, it will check if the generation number from the mux equals the highest generation number seen; if yes, the connection will be reset. If not, the server silently discards the packet. This will force the client to retransmit the packet, and in the meantime the stale mux mappings will be updated to the latest generation, solving the issue. Note that, if the border router uses resilient hashing, the mechanism above becomes nearly superfluous.


Beamer can also handle MPTCP traffic (all iOS-based phones have MPTCP, as do high end Android devices). MPTCP connections contain multiple subflows. Beamer uses the (otherwise unused) destination port in SYN JOIN packets to encode the server identifier for secondary subflows. Ports 1-1024 are reserved for actual services, port numbers in the range 1025-65535 are used to encode server identifiers.

Whenever a new MPTCP connection is established, servers send an ACK with add address option to the client with the VIP address and the server identifier as port number. The client remembers this new address/port combination and will send subsequent subflows to it.

The control plane

The control plane is built on top of ZooKeeper and uses two-phase commit to keep copies in sync. Only the controller writes to ZooKeeper, and muxes only read. Backend servers do not interact with ZooKeeper at all.

See §4.5 for the fine print.

Evaluation results

Experiments on EC2 tested Beamer with one hundred muxes, 64K DIPs, and 6.4 million buckets.

Our results show that Beamer is simultaneously fast and robust: no connections are ever dropped, in contrast to stateful approaches, Beamer’s dataplane performance is twice that of the best existing software solution, and our mux introduces negligible latency when underloaded (100µs).


With a single server sourcing around 1-10Gbps of traffic, a single software mux could cater for 50-500 servers. Testing a P4 prototype implementation using a hardware simulator showed that a P4 mux can handle around 60Mpps.

The evaluation section also contains an analysis of the impact of mux churn (§6.2), MPTCP load balancing (§6.3), and controller scalability (§6.4), which I don’t have space to cover here. The short version is that everything seems to work well!

Our experiments show that Beamer is not only fast, but also extremely robust to mux and server addition, removal, or failures as well as heavy SYN flood attacks.

Andromeda: performance, isolation, and velocity at scale in cloud network virtualization

May 2, 2018

Andromeda: performance, isolation, and velocity at scale in cloud network virtualization Dalton et al., NSDI’18

Yesterday we took a look at the Microsoft Azure networking stack, today it’s the turn of the Google Cloud Platform. (It’s a very handy coincidence to have two such experience and system design report papers appearing side by side so that we can compare). Andromeda has similar design goals to AccelNet: performance close to hardware, serviceability, and the flexibility and velocity of a software-based architecture. The Google team solve those challenges in a very different way though, being prepared to make use of host cores (which you’ll recall the Azure team wanted to avoid).

We opted for a high-performance software-based architecture instead of a hardware-only solution like SR-IOV because software enables flexible, high-velocity feature deployment… Andromeda consumes a few percent of the CPU and memory on-host. One physical CPU core is reserved for the Andromeda dataplane… In the future, we plan to increase the dataplane CPU reservation to two physical cores on newer hosts with faster physical NICs and more CPU cores in order to improve VM network throughput.

High-level design

Both the control plane and data plane use a hierarchical structure. The control plane maintains information about every VM in the network, together with all higher-level product and infrastructure state such as firewalls, load-balancers, and routing policy. It is designed around a global hierarchy in conjunction with the overall cloud cluster management layer.

The data plane offers a set of flexible user-space packet processing paths:

  • High-performance, latency critical flows are processed end-to-end on a fast path with a per-packed budget of 300ns.
  • On-host software coprocessors running in per-VM floating threads perform per-packet work that is CPU-intensive or without strict latency targets. Coprocessors allow feature growth without compromising the fast path.
  • Packets that don’t match a flow rule on the VM host are sent to hoverboards: dedicated gateways that perform virtual network routing. Only active flows are installed on VM hosts based on current communication patterns, this allows the long tail of mostly idle flows to be processed by hoverboards.

Avoiding the need to install full-forwarding information on every host improves per-server memory utilization and control-plane scalability by over an order of magnitude.

The control plane

The control plane is made up of three layers. The uppermost layer, cluster-management, provisions resources on behalf of users. It’s functionality is not networking-specific. When the cluster management layer needs to configure virtual networks, it uses the fabric management layer services to do so. VM Controllers (VMCs) within the fabric layer program switches using a convergence model to bring switches in line with the desired state. At the bottom of the stack we find the switch layer dealing with on-host switches and hoverboards. Each VM host has a virtual switch based on Open vSwitch which handles all traffic all for VMs on the host. Hoverboards are standalone switches.

Multiple VMCs partitions are deployed in every cluster, with each partition responsible for a fraction of the cluster hosts based consistent hashing. Each VMC partition consists of a Chubby-elected master (an interesting use case for an in-switch coordination service such as NetChain? ) and two standbys. The overall control plane is split into a regionally aware control plane (RACP) and a globally aware control plane (GACP). This helps provide fault isolation barriers to prevent problems cascading across regions.

Every layer of the control plane is designed to fail static. For example, hosts continue to forward packets using the last-known-good programming state, even if VMCs are unavailable. Hosts checkpoint their state to allow them to preserve the fail static behavior across host process restarts, and we routinely test fail-static behavior by simulating outages in a test environment.

Andromeda supports migration across clusters with no packet loss. This use case was one of the key factors in avoiding the use of hardware-only SR-IOV-based solutions.

The control layer needs to scale to millions of individual VMs with fast reconfiguration. There are three basic approaches to programming SDNs:

  1. In the pre-programmed model, the control plane programs a full mesh of programming rules from each VM to every other VM. This of course scales quadratically with network size and require mass propagation of state on network changes. It’s not going to work well at Google scale.
  2. In the on-demand model, the first packet of a flow is sent to the controller, which then programs the required forwarding rule. It means that the first packet has high latency, is very sensitive to control plane outages, and opens the door to accidental or malicious DoS attacks on the control plane. Rate limiting across tenants in this scenario is complex.
  3. In the gateway model, all packets of a specific type (e.g., destined for the Internet) are sent to a gateway device. This provides predictable performance and control plane scalability, but can lead to a large number of gateways.

Andromeda’s hoverboard model is designed to combine the benefits of the on-demand and gateway approaches. Initially all packets for which the VM host stack does not have a route are sent to Hoverboard gateways. The controller detects flows that exceed a specified usage threshold and programs offload flows for them: direct host-to-host flows bypassing the hoverboards. Since the distribution of flow bandwidth tends to be highly skewed, a small number of offload flows divert the vast majority of traffic in the cluster away from the hoverboards.

The data plane

The VM host dataplane is a userspace process that performs all on-host VM packet processing, combining both virtual NIC and virtual switch functionality… Our overall dataplane design philosophy is flexible, high-performance software coupled with hardware offloads. A high performance software dataplane can provide performance indistinguishable from the underlying hardware.

Sufficient fast-path CPU is provisioned to meet throughput targets. Encryption, checksums, and memory copies (using Intel QuickData DMA Engines) are offloaded to hardware. Only performance critical low-latency work runs on the fast-path, everything else belongs on the co-processor path. The fast path avoids locks and synchronisation, carefully optimises memory locality, uses hugepages, avoids thread handoffs and system calls, and uses end-to-end batching. Developing co-processor features is easier because they don’t have to strictly follow all of these practices: for example, they may acquire locks, allocate memory, and perform system calls.

To ensure middlebox functionality does not degrade fast path performance the control plane pre-computes any per-flow middlebox packet stage work as part of installing a flow. For example, Google use an always-on connection tracking firewall. To minimise per-flow work the firewall rules are examined on a flow miss. If the IP addresses and protocol in the flow are always allowed in both directions, then no firewall work is needed in the fast path. Otherwise a flow firewall policy is pre-computed (indicating allowed port ranges for the flow IPs), and the fast path just matches packets against these port ranges, which is much faster than evaluating the full firewall policy.

Fast path flow table lookup is used to determine the coprocessor stages that are enabled for a packet. If coprocessor stages are enabled, the packet is sent to the appropriate coprocessor thread via an SPSC (single-producer, single-consumer) packet ring.


Andromeda has evolved over a five-year period, enabling us to see the impact of various design changes that were made during this period.

  • Pre-Andromeda the dataplane was implemented entirely in the VMM using UDP sockets for packet I/O.
  • Andromeda 1.0 included an optimised VMM packet pipeline and a modified kernel Open vSwitch.
  • Andromeda 1.5 added ingress virtual NIC offloads and coalesced redundant lookups in the VMM with the kernel OVS flow table lookup.
  • Andromeda 2.0 consolidate prior VMM and host kernel packet processing into a new OS-bypass busy-polling userspace dataplane.
  • Andromeda 2.1 directly accesses virtual NIC rings in the dataplane, bypassing the VMM.
  • Andromeda 2.2 uses Intel QuickData DMA Engines to offload larger packet copies.

The following charts show the impact of these changes on throughput and latency:

You can also see the reducing demands on the CPU with increasing offloads:

In the control plane, using the original pre-programming approach, it was not possible to get beyond about 40K VMs. At this level, VMCs are programming 487M flows in 74 seconds, using 10GB RAM per partition. With the introduction of hoverboards only 1.5M flows need to programmed, which can be done in 1.9 seconds with 513MB RAM per partition.


In the future, we will offload more substantial portions of the Fast Path to hardware. We will continue to improve scalability, as lightweight virtual network endpoints such as containers will result in much larger and more dynamic virtual networks.

Azure accelerated networking: SmartNICs in the public cloud

May 1, 2018

Azure accelerated networking: SmartNICs in the public cloud Firestone et al., NSDI’18

We’re still on the ‘beyond CPUs’ theme today, with a great paper from Microsoft detailing their use of FPGAs to accelerate networking in Azure. Microsoft have been doing this since 2015, and hence this paper also serves as a wonderful experience report documenting the thought processes that led to an FPGA-based design, and lessons learned transitioning an all-software team to include hardware components.

There’s another reminder here too of the scale at which cloud vendors operate, which makes doing a project like this viable. The bulk purchase of FPGAs keeps their cost low, and the scale of the project makes the development investment worthwhile.

One question we are often asked is if FPGAs are ready to serve as SmartNICs more broadly outside Microsoft… We’ve observed that necessary tooling, basic IP blocks, and general support have dramatically improved over the last few years. But this would still be a daunting task for a new team… The scale of Azure is large enough to justify the massive development efforts — we achieved a level of performance and efficiency simply not possible with CPUs, and programmability far beyond an ASIC, at a cost that was reasonable because of our volume. But we don’t expect this to be a natural choice for anyone beyond a large-scale cloud vendor until the ecosystem evolves further.

The investment in AccelNet certainly seems to be paying off when it comes to networking performance in Azure. Here’s a comparison of VM-VM throughput, average latency, and tail (P99) latency across Azure, AWS, and GCP:


The networking challenge with virtual infrastructure

In traditional virtual machine networking all network I/O to and from a physical device is performed in the host software partition of the hypervisor, with every packet sent and received processed by a virtual switch in the host networking stack. This of course burns a lot of CPU and increases latency.

SR-IOV compliant hardware enables PCI Express (PCIe) device hardware to be shared among multiple VMs. Bypassing the hypervisor makes things go much faster, but it also bypasses all the host SDN policies. That’s a problem because infrastructure-as-a-service platforms have to provide rich network semantics (private virtual networks, software load balancers, virtual routing tables, bandwidth monitoring and so on). In Azure, these features are implemented in the Virtual Filtering Platform (VFP), a cloud-scale programming vSwitch providing scalable SDN policy for Azure.

The challenge is to get performance levels close to SR-IOV, while still delivering the SDN flexibility provided by VFP.

Design goals and constraints

An initial analysis revealed that any solution would need to satisfy the following:

  1. It shouldn’t consume host CPU cores – the Azure business depends on renting these out to customers, and overheads should be kept to a minimum.
  2. Retain a high degree of programmability. Offloading every rule to hardware is neither feasible nor desirable. The flexibility of VFP should be maintained without any knowledge that policy is being offloaded.
  3. Achieve latency, throughput, and utilisation on a par with SR-IOV hardware. (AccelNet achieves this by for all but the very first packet of each flow).
  4. Support adding and changing actions over time. “We were, and continue to be, very wary of designs that locked us into a fixed set of flow actions.”
  5. Support frequent deployment of new functionality in the existing hardware fleet, and not just on new servers.
  6. Provide high single connection performance. A single CPU core generally can’t achieve peak bandwidth performance at 40Gb and higher (and 100Gb is coming!). To get beyond this requires the use of multiple threads, spreading traffic across multiple connections. This would require substantial changes to customer applications. “An explicit goal of AccelNet is to allow applications to achieve near-peak bandwidths without parallelizing the network processing in their application.
  7. Have a path that will scale to 100GbE and beyond.
  8. Support full serviceability with live migration and no loss of flow state. One of the key lessons learned is the entire system, from hardware to software, must be designed to be serviceable and monitorable from day one.

Serviceability cannot be bolted on later.

Evaluating hardware choices

ASICs, multi-core SoCs, or FPGAs?


Custom ASIC designs provide the highest performance potential, but they’re harder to program and even harder to adapt over time. With a time lag of 1-2 years between requirement specifications and the arrival of silicon, the hardware was already behind requirements before it arrived. Couple that with a 5 year server lifespan and no easy way to retrofit most servers at scale, and you’re actually looking at a design that has to last for 7 years into the future. That’s like predicting what you need today back in 2011!

So ASICS are out.

Multicore SoC-based NICs

Multicore SoC-based NICs use a sea of embedded CPU cores to process packets, trading some performance to provide substantially better programmability than ASIC designs.

At higher network speeds of 40GbE and above though, the number of required cores increases significantly and scattering and gathering packets becomes increasingly inefficient. “… 40GbE parts with software-based datapaths are already surprisingly large, power hungry, and expensive, and their scalability for 100GbE, 200GbE, and 400GbE looks bleak.”

So multicore SoC-based NICs are out.


FPGAs balance the performance of ASICs with the programmability of SoC NICs.

The key characteristics of FPGAs that made it attractive for AccelNet were the programmability to adapt to new features, the performance and efficiency of customized hardware, and the ability to create deep processing pipelines, which improve single-flow performance.

Note that Microsoft had already been running a multi-thousand node cluster of networked FPGAs doing search ranking for Bing, which improved their confidence in the viability of an FPGA-based design. However,…

… our networking group, who had until then operated entirely as a software group, was initially skeptical— even though FPGAs are widely used in networking in routers, applications, and appliances they were not commonly used as NICs or in datacenter servers, and the team didn’t have significant experience programming or using FGPAs in production settings.

Dedicated FPGA developers were added to the team, but not many— fewer than 5 at any given time.

AccelNet in brief

AccelNet places the FPGA as a bump-in-the-wire between the NIC and the Top of Rack (TOR) switch, making it a filter on the network. The FPGA is also connected by 2 PCIe connections to the CPUs, so that it can be used for accelerator workloads like AI and web search. Below you can see pictures of (a) the first, and (b) the second generation of AccelNet boards.


The control plane is largely unchanged from the original VFP design and runs almost entirely in the hypervisor. The Generic Flow Table based datapath design is implemented in hardware. There are two deeply pipelined packet processing units, each with four major pipeline stages:

  1. A store and forward packet buffer
  2. A parser
  3. A flow lookup and match
  4. A flow action


A key design goal is that TCP flows and vNICs should survive FGPA reconfiguration, FPGA driver updates, NIC PF driver updates , and Generic Flow Table updates. This is achieved via an intermediary (failsafe) Poll Mode Driver (PMD) which can bind to either the accelerated data path or the host software stack. When the hardware needs to be updated the failsafe driver switches to the VMBUS channels and the application sees no interruption bar reduced performance for a period of time.

Lessons learned

I raced through the description of AccelNet’s design, so that we could spend some time and space on lessons learned from the project, which are of interest beyond just this particular project.

  1. Design for serviceability upfront. This was the hardest part to get right and required the whole system including hardware and software to be designed to be serviceable and monitorable from day one.
  2. Use a unified development team. “If you want hardware/software co-design, hardware devs should be in the same team as software devs.
  3. Use software development techniques for FPGAs – treat and ship hardware logic as if it were software.
  4. Better performance leads to better reliability. By separating the network datapath such that it no longer shares cores or resources with the host, not only does it perform better, but many sources of interference are eliminated leading to greater reliability.
  5. Hardware/software design is best when iterative. ASICs force a waterfall like approach, but with FPGAs hardware developers could be far more agile in their approach.
  6. Failure rates remained low, with FPGAs proving reliable in datacenters worldwide (DRAM failed the most by the way).
  7. Upper layers should be agnostic of offloads. The offload acceleration was transparent to controllers and upper layers, making AccelNet much less disruptive to deploy.
  8. As a nice bonus, “because AccelNet bypasses the host and CPUs entirely, our AccelNet customers saw significantly less impact to network performance, and many redeployed older tenants to AccelNet-capable hardware just to avoid these impacts.

Future work will describe entirely new functionality we’ve found we can support now that we have programmable NICs on every host.

NetChain: Scale-free sub-RTT coordination

April 30, 2018

NetChain: Scale-free sub-RTT coordination Jin et al., NSDI’18

NetChain won a best paper award at NSDI 2018 earlier this month. By thinking outside of the box (in this case, the box is the chassis containing the server), Jin et al. have demonstrated how to build a coordination service (think Apache ZooKeeper) with incredibly low latency and high throughput. We’re talking 9.7 microseconds for both reads and writes, with scalability on the order of tens of billions of operations per second. Similarly to KV-Direct that we looked at last year, NetChain achieves this stunning performance by moving the system implementation into the network. Whereas KV-Direct used programmable NICs though, NetChain takes advantage of programmable switches, and can be incrementally deployed in existing datacenters.

We expect a lightning fast coordination system like NetChain can open the door for designing a new generation of distributed systems beyond distributed transactions.

It’s really exciting to watch all of the performance leaps being made by moving compute and storage around (accelerators, taking advantage of storage pockets e.g. processing-in-memory, non-volatile memory, in-network processing, and so on). The sheer processing power we’ll have at our disposal as all of these become mainstream is staggering to think about.

The big idea

Coordination services (e.g. ZooKeeper, Chubby) are used to synchronise access to resources in distributed systems providing services such as configuration management, group membership, distributed locking, and barriers. Because they offer strong consistency guarantees, their usage can become a bottleneck. Today’s server based solutions require multiple RTTs to process a query. Client’s send requests to coordination servers, which execute a consensus protocol (e.g. Paxos), and then send a reply back to the client. The lower bound is one RTT (as achieved by e.g. NOPaxos).

Suppose for a moment we could distributed coordination service state among the network switches instead of using servers, and that we could run the consensus protocol among those switches. Switches process packets pretty damn fast, meaning that the query latency can come down to less than one RTT!

We stress that NetChain is not intended to provide a new theoretical answer to the consensus problem, but rather to provide a systems solution to the problem. Sub-RTT implies that NetChain is able to provide coordination within the network, and thus reduces the query latency to as little as half of an RTT. Clients only experience processing delays caused by their own software stack plus a relatively small network delay. Additionally, as merchant switch ASICs can process several billion packets per second (bpps), NetChain achieves orders of magnitude higher throughput, and scales out by partitioning data across multiple switches…

Modern programmable switch ASICs provide on-chip storage for user-defined data that can be read and modified for each packet at line rate. Commodity switches have tens of megabytes of on-chip SRAM. Datacenter networks don’t use of all this, and so a large proportion can be allocated to NetChain. A datacenter with 100 switches, allocating 10MB per switch, can store 1GB in total, or 333MB effective storage with a replication factor of three. That’s enough for around 10 million concurrent locks for example. With distributed transactions that take 100 µs, NetChain could provide 100 billion locks per second – which should be enough for a while at least! Even just three switches would accommodate 0.3 million concurrent locks, or about 3 billion locks per second. There’s a limitation on individual value sizes of about 192 bytes though for full speed.

We suggest that NetChain is best suited for small values that need frequent access, such as configuration parameters, barriers, and locks.

Consistent hashing is used to partition the key-value store over multiple switches.

In-Network processing + Chain replication = NetChain

So we’ve convinced ourselves that there’s enough in-switch storage to potentially build an interesting coordination system using programmable switches. But how do we achieve strong consistency and fault-tolerance?

Vertical Paxos divides the consensus protocol into two parts; a steady state protocol, and a reconfiguration protocol. These two parts can be naturally mapped to the network data and control planes respectively. Both read and write requests are therefore processed directly in the switch data plane without controller involvement. The controller handles system reconfigurations such as switch failures, and doesn’t need to be as fast because these are comparatively rare events.

For the steady state protocol, NetChain using a variant of chain replication. Switches are organised in a chain structure with read queries handled by the tail and write queries sent to the head, processed by each node along the chain, and replied to at the tail.

Queries are routed according to the chain structure, building on top of existing underlay routing protocols. Each switch is given an IP address, and an IP list of chain nodes is stored in the packet header. The destination node in the IP header indicates the next chain node. When a switch receives a packet and the destination IP matches its own address, it decodes the query and performs the read or write operation. Then it updates the destination node to the next chain node, or to to the client IP if it is the tail.

Write queries store chain IP lists as the chain order from head to tail; read queries use the reverse order (switch IPs other than the tail are used for failure handling…). The chain IP lists are encoded to UDP payloads by NetChain agents. As we use consistent hashing, a NetChain agent only needs to store a small amount of data to maintain the mapping from keys to switch chains.

Because UDP packets can arrive out of order, NeChain introduces its own sequence numbers to serialize write queries.

The end solution offers per-key read and write queries, rather than per-object. NetChain does not support multi-key transactions.

Handling failures and configuration changes in the control plane

The NetChain controller runs as a component in the network controller. Switch failures are handled in two stages. Fast failover quickly reconfigures the network to resume serving queries with the remaining nodes in each affected chain. This degraded mode can now tolerate one less failure than the original of course. Failure recovery then adds other switches as new replication nodes for the affected chains, restoring their full fault tolerance.

Fast failover is pretty simple. You just need to modify the ‘next’ pointer of the node before the failed one to skip that node:

This is implemented with a rule in the neighbour switches of the failed switch, which checks the destination IP. If it is the IP of a failed switch, then the destination IP is replaced with the next chain hop after the failed switch, or the client IP if we’re at the tail.

For failure recovery, imagine a failed switch mapped to k virtual nodes. These are randomly assigned to k live switches, helping to spread the load of failure recovery. Since each virtual node belongs to f+1 chains, it nows needs to be patched into each of them, which is again done by adjusting chain pointers. In the following figure, fast failover has added the blue line from N to S2, and then failure recovery patches in the orange lines to and from S3.

Before splicing in the new node, the state is first copied to it. This can be time-consuming, but availability is not affected. The switch to add the new node is done using a two-phase atomic protocol once the state is in place. To further minimise service disruptions, switches are mapped to multiple virtual groups (e.g. 100), with each group available for 99% of recovery time and only queries to one group at a time affected (paused) by the switchover protocol.

Incremental adoption and hybrid deployments

NetChain is compatible with existing routing protocols and network services and therefore can be incrementally deployed. It only needs to be deployed on a few switches initially to be effective, and then its throughput and storage capacity can be expanded by adding more switches.

NetChain offers lower level services (no multi-key transactions) and reduced per-key storage compared to full-blown coordination services. For some use cases this won’t be a problem (e.g., managing large numbers of distributed locks). In other cases, you could use NetChain as an accelerator to server-based solutions such as Chubby or ZooKeeper, with NetChain used to store hot keys with small value sizes, while traditional servers store big and less popular data .


The testbed consists of four 6.5 Tbps Barefoot Tofino switches and four 16-core server machines with 128GB memory. NetChain is compared against Apache Zookeeper.

The comparison is slightly unfair; NetChain does not provide all features of ZooKeeper, and ZooKeeper is a production-quality system that compromises its performance for many software-engineering objectives. But at a high level, the comparison uses ZooKeeper as a reference for server-based solutions to demonstrate the performance advantages of NetChain.

In all the charts that follow, pay attention to the breaks in the y-axis and/or the use of log scales.

NetChain provides orders of magnitude higher throughput than ZooKeeper, and neither system is affected by value size (in the 0-128 byte range at least) or store size:

As the write ratio goes up, NetChain keeps on delivering maximum throughput, whereas ZooKeeper’s performance starts to drop off. (At 100% write ratio, ZooKeeper is doing 27 KQPS, while NetChain is still delivering 82 MQPS – each test server can send and receive queries at up to 20.5 MQPS, and there are four of them).

NetChain is also more tolerant of packet loss:

NetChain has the same latency for both reads and writes, at 9.7 µs query latency, and this stays constant even when all four servers are generating queries at their maximum rate. The system will saturate at around 2 BQPS. ZooKeeper meanwhile has 170 µs read latency and 2350 µs write latency at low throughput, the system saturates at 27 KQPS for writes and 230 KQPS for reads.

As you add more switches to NetChain, throughput grows linearly.

The following figure shows that using virtual groups successfully mitigates most of the performance impact during failure recovery:

And finally, here’s a TPC-C new order transaction workload which allowing testing transactions under different contention levels. By using NetChain as a lock server, the system can achieve orders of magnitude higher transaction throughput than ZooKeeper.

We believe NetChain exemplifies a new generation of ultra-low latency systems enabled by programmable networks.

SmoothOperator: reducing power fragmentation and improving power utilization in large-scale datacenters

April 27, 2018

SmoothOperator: reducing power fragmentation and improving power utilization in large-scale datacenters Hsu et al., ASPLOS’18

What do you do when your theory of constraints analysis reveals that power has become your major limiting factor? That is, you can’t add more servers to your existing datacenter(s) without blowing your power budget, and you don’t want to build a new datacenter just for that? In this paper, Hsu et al. analyse power utilisation in Facebook datacenters and find that overall power budget utilisation can be comparatively low, even while peak requirements are at capacity. We can’t easily smooth the workload (that’s driven by business and end-user requirements), but maybe we can do something to smooth the power usage.

Our experiments based on real production workload and power traces show that we are able to host up to 13% more machines in production, without changing the underlying power infrastructure. Utilizing the unleashed power headroom with dynamic reshaping, we achieve up to an estimated total of 15% and 11% throughput improvement for latency-critical service and batch service respectively at the same time, with up to 44% of energy slack reduction.

No more headroom and low utilisation…

There’s a maximum safe amount of power that the power infrastructure at a given datacenter can supply. Naturally, we need the peak power demand to remain under this budget. Given user facing workloads with fluctuating levels of demand (typically, diurnal patterns), it’s easy to end up with highly underutilised power budgets during the rest of the day.

The situation is made worse by the multi-level power delivery infrastructure . Facebook datacenters are composed of multiple rooms (called suites). Suites have multiple main switching boards (MSBs), each of which supplies some second-level switching boards (SBs), which further feed a set of reactive power panels (RPPs). From here the power is fed into racks, each composed of tens of servers. The power budget of each node is approximately the sum of the budgets of its children.

Power budget-fragmentation exists because servers hosting the services with synchronous power consumption patterns are grouped together under the same sub-tree of the multi-level power infrastructure. Such a placement creates rapid, unbalanced peaks with a high amplitude, which consume the power budget of the supplying power node fast.

(If you go over the budget of course, a circuit-breaker— a real circuit-breaker ;)— trips and you lose power in the whole sub-tree).

The level of power budget fragmentation and the inefficiencies in the power budget utilisation are captured in two main metrics: sum of peaks, and power slack.

  • Sum of peaks simply adds up the peak power demand (during the period of measurement, one week in this study) at each of the power nodes in the datacenter. With an unbalanced placement, each node may see a high peak at some point in time, leading to a larger ‘sum of peaks’ value overall.
  • Power-slack measures the unused power budget at a given point in time (the total power budget minus the current demand). Lower slack equals higher utilisation. Energy slack is the integral of power slack over a given timespan. Low energy slack means the power budget is highly utilised over the corresponding timespan.

Workload aware placement and remapping

The basic idea is easy to grasp. Instead of co-locating services synchronised power demands (leading to high peaks and low troughs), strive for a balanced mix of power demands under each power node, to smooth things out.

Take a look at the diurnal power demand patterns for three major Facebook services: web clusters, db clusters, and hadoop. These reveal the opportunity for blending.

The web clusters serve end user traffic and are one of the largest consumers of power budget. The servers in these clusters have highly synchronous power patterns, following the typical user activity level.

The db servers are shielded by caching layer(s) and are more I/O-bound, thus even when the front-end servers are experienced heavy load they do not exhibit high power consumption. At night though, when performing the daily backup they do a lot of data compression which drives high power usage. Thus the db servers also have a diurnal pattern, but their peaks occur during the night.

Hadoop clusters are optimised for high throughput, but are not tied to any direct user interaction. Their power consumption is constantly high.

A service is a collection of hundreds to thousands of server instances (interesting side note, all deployed directly as native processes – no VMs or containers in sight). Even within the same service, there can be significant amounts of instance-level heterogeneity in power demands. This usually stems from imbalanced access patterns or skewed popularity amongst different service instances.

The SmoothOperator placement framework aims to take all of this into account to find power-balanced instance placements. It can also monitor the datacenter over time to make fine-tuned adjustments if things start to move out of balance. There are four main stages to the SmoothOperator workflow:


  1. Collecting traces (over multiple weeks) for each service instance. These are also combined to create overall service power traces for each of the top power-consuming services. The service power traces capture the in-aggregate behaviour of the service instances.
  2. Calculating asynchrony scores. Each service instance is compared against the overall service power trace for each of the services. The result is an n-dimensional vector (where n is the number of services) in which each element indicates how aligned power requirements of the instance are with that service. A low asynchrony score indicates the power demands move more closely together, a high asynchrony scores indicates they are more independent.
  3. We take the asynchrony score vectors and run a clustering algorithm to find clusters that exhibit largely synchronous power consumption.
  4. The clustering results are used to guide workload placement. The goal is to have high asynchrony scores at all levels of power nodes, which maximises the power headroom and mitigates fragmentation.

Placement works as follows. Starting at the top level in the multi-level power delivery system, we know that we need to divide service instances across q second-level nodes. We use k-means clustering to obtain h clusters, where h is a multiple of q. An additional constraint in this process is that each cluster must have the same number of instances (which must result in some strained clusters!). Now we simply iterate through all the clusters and assign 1/q of the service instances in each to each second-level power node. We can repeat this process at each level of the hierarchy until the leaf nodes are reached.

Here you can see the smoothing of power demand achieved by the algorithm at a sample node in the hierarchy:

There are three datacenters involved in the study. For each of these datacenters, the following chart shows the peak-power reduction achieved at each level in the datacentre (from suites all the way down to the reactive power panels).

Our workload-aware service instance placement can reduce the peak power by 2.3%, 7.1%, and 13.1% for the three datacenters respectively.

(The effects are less pronounced as you move up the hierarchy because you naturally include more instances in your ‘sample’, which leads to more heterogeneity).

What should we do with the new servers we can now squeeze in?

With the peak power demands reduced, we can now squeeze more servers into our datacenters! But where should we put them, and what should they do? If we dedicate them to latency critical workloads (e.g., web) we’ll be able to handle more traffic at peak, but they’ll be under-utilised off peak. It would be better if we could use the new servers to help with latency critical workloads under peak loads, and batch throughput during off-peak hours. (We seem to be rediscovering the idea of auto-scaling here!!).

Facebook has recently been widely deploying storage-disaggregated servers, in which the main storage components (e.g. flash memory) are accessed over the network. (I.e., a move from directly-attached storage to network-attached storage). Fast networks make this approach make sense. With your storage subsystem decoupled from your compute, it’s easy to switch the service a given node is running because data remains intact and accessible from anywhere. (I can almost hear Google saying ‘I told you so…’ 😉 ).

The main novelty here then, is switching the job that a given server is handling based on power-budget considerations. When the average power load level of latency-critical workload services falls below a threshold some of them are converted to run batch workloads (and vice versa). Throughput is further maximised by throttling the power consumption of batch clusters during peak hours.

Across the three datacenters, this policy improves latency critical workload throughput by (an additional) 7.2%, 8%, and 1.8% respectively: a capacity gain that can accommodate many millions of extra queries per second.

The overall slack measurements give us an indication of improvement at the datacenter level. Datacenter 3 shows the least improvement since it has a heavier skew towards latency critical services (meaning less batch workload is available for balancing).

The dynamic power profile reshaping achieves 44%, 41%, and 18% average power slack reduction, respectively in the three datacenters.