Welcome to MSDN Blogs Sign in | Join | Help
We haven’t forgotten about other models – honest!

Disclaimer:  The Maestro project has been renamed.  While referred to in this article as ‘Maestro’, moving forward the project is referred to with codename ‘Axum’.

Copied from 'Parallel Programming with .NET' 

When discussing parallel computing and managed code, for the most part we’ve stuck to talking about a few parallel paradigms such as task parallelism (TPL) and data parallelism (PLINQ) (our brethren at the http://blogs.msdn.com/stmteam blog have also discussed some specific mechanisms for ensuring safety in parallel applications). Mostly, this has been due to our focus on what we’re shipping next. The parallel support in Visual Studio 2010 has always been focused on “low-hanging fruit”: how do we enable developers to quickly take advantage of multi-core and manycore in both their legacy applications and new applications? How do we make it easy for them to express the parallelism in their applications? That doesn’t mean we haven’t been thinking about and working on other models – models that may require a larger conceptual jump but may be better suited for architecting applications for parallelism from the ground up.

The constructs we’re providing in Visual Studio 2010 are most useful in shared-memory systems where multiple threads operate concurrently on shared state. Often times this type of concurrency is OK. For simple applications, the dependencies between threads can be manageable, allowing those apps to scale well on today’s machines. However, when we start to parallelize complex applications for tens or hundreds or thousands of cores, these dependencies become far too complex to manage and, regardless of complexity, the contention caused by so many threads fighting for access to the same state severely limits scalability. We like to learn by example, and surely there must be other successful applications that are complex and scale well to a large number of CPUs.

You betcha; you’re lookin’ it at.

Honestly, what’s more successful, parallel, and scalable than the web? Given that we may someday see 1,000+ cores on our desktops, wouldn’t it be great if we could shrink the web’s architecture down onto a single multi-core computer? The web blends a lot of different paradigms and models, and the jargon tossed around can make your head spin. So I’ll keep it simple and talk about a few of the properties that make the web such a great parallel application:

Isolation

Each component that makes up to the web is an isolated machine that works in parallel with other components but does not implicitly share resources – specifically memory. Any component that wishes to use another component’s resources must do so via an explicit request and typically via a very formal and narrow channel. There are pros and cons to this restriction.

The negative aspect of this is performance: when a node needs access to the resources of another node, it can’t just take it when it needs it; it must wait until the request has been processed.

The positive aspect of isolation is how it helps reduce complexity. As I noted earlier, managing the dependencies between components in a large complex system can be extremely difficult. This difficulty can lead to a severe hindrance on developer productivity. The great emergent property of this restriction is that it actually eliminates implicit data dependencies, thus reducing the surface area that components can depend on. This drastically reduces the complexity not only for client components, but for server components as well. When components are isolated, they are the exclusive owners of their state and can read and write to state without fear that it may be changed erratically by another component.

dependency_graphs

Figure 1. Example dependency graphs of a concurrent application in a model which allows implicit dependencies (left) vs. a concurrent application in a model which disallows implicit dependencies (right). Each rectangle represents code acting on shared state.

Message-passing

Message-passing is really an artifact of isolation, and it’s a communication mechanism that provides many benefits to massively parallel applications. The difference between message-passing and shared-state communication is equivalent to the difference between sending a colleague an e-mail requesting her to complete a task and opening up her organizer to write down the task directly in her to-do list. More than just being rude, the latter is likely to confuse her – she might erase it, not notice it, or accidentally prioritize it incorrectly. The former is a bit gentler: she can process the e-mail when she’s ready, and because she received the message in a very standard, formal manner, she’s more likely to use the information correctly. This is much like how the web works. When a component needs to communicate state with other components or needs access to a resource, it sends this information in a message.

There are both beneficial and detrimental aspects to message-passing. Typically, to communicate state outside of an isolation boundary, the component passing the information must copy it, potentially introducing a significant amount of overhead, particularly if the data is large. If, however, the overhead is not prohibitively expensive, message-passing is a very scalable means of communication between concurrent components. To understand why, let’s first consider the alternative. Let’s imagine that there are five parallel components (threads in this case) that each need to perform 100 complex calculations and add each resulting integer to a shared list. The caveat is that the list must maintain a sorted order. The code might look something like the following:

using System;
using System.Collections.Generic;
using System.Threading;

class Program
{
    static void Main()
    {
        // create a list and add something to it
        LinkedList<int> sortedList = new LinkedList<int>();
        // thread synchronization...
        ManualResetEvent mre = new ManualResetEvent(false);
        int count = 5;
        object mutex = new Object();
        // 5 threads each add 100 items to the list in sorted order   
        for (int t = 0; t < 5; ++t)
        {
            Thread thread = new Thread(new ThreadStart(() =>
            {
                // using the Random class in a multi-thread environment is not
                // guaranteed to give truly random numbers, but it’s ok
                // for this example   
                Random r = new Random();

                for (int i = 0; i < 100; ++i)
                {
                    // emulate a complex calculation
                    for (int z = 0; z < 1000000; ++z) ;
                    int num = r.Next();

                    // insert the item in sorted order with mutual exclusion
                    lock (mutex)
                    {
                        LinkedListNode<int> curNode = sortedList.First;
                        while (true)
                        {
                            if (curNode == null || curNode.Next == null)
                            {
                                sortedList.AddLast(num);
                                break;
                            }
                            else if (curNode.Value >= num)
                            {
                                sortedList.AddBefore(curNode, num);
                                break;
                            }
                            curNode = curNode.Next;
                        }
                    }
                }
                // if we're the last thread to finish, set the MRE
                if (Interlocked.Decrement(ref count) == 0) mre.Set();
            }));
            thread.Start();
        }
        mre.WaitOne();
        foreach (int val in sortedList)
        {
            Console.WriteLine(val);
        }
        Console.ReadLine();
    }
}

Figure 2. Example of a shared-state, parallel algorithm.

Note that in this example it’s necessary to use a lock around all of the code that must analyze and modify the data atomically. In effect, after the first five “complex calculations” complete, each thread halts progress as it blocks, waiting for exclusive access to the lock. As shown in the figure below, this results in a large amount of time spent waiting that, instead, could be spent processing more items and reducing the overall time to completion.

shared_state_blocking

Figure 3. Allocation of execution time for the example parallel algorithm using shared-state.

shared_state_PPA

Figure 4. Screen clip of Thread Blocking View (Visual Studio 2010) analysis of example program.

What’s particularly bad about this situation is how it behaves when a lot of threads are added. Notice how the longest red block (time wasted on waiting) is directly proportional to the number of threads. To be more precise, the last thread that acquires the lock will spend roughly (n-1) x c milliseconds waiting, where n is equal to the number of threads waiting on the lock and c is the time it takes to update the data structure. If there were 10 times as many threads, the longest waiting period would be 10 times as long, adding a significant amount to the overall time to completion. This design also has the potential to adversely affect responsiveness. Imagine if the thread on the far left of Figure 3 was a Windows UI thread and the green chunks represent time spent processing user input. All the yellow and red times are times when the UI would be non-responsive, possibly grayed-out and stuck with a nasty hour-glass cursor.

Now let’s take the same scenario and use a hypothetical message-passing technology. Typically, message-passing technologies have the notion of a mailbox – what is essentially a thread-safe queue – for asynchronously passing messages and buffering them in order. So now we re-architect our application such that there is one parallel component, complete with mailbox, that is the sole owner of the list and the only component that makes updates to it. We create five other components that each perform 100 complex calculations in parallel (just like in the shared-state version) except that this time, instead of waiting to acquire exclusive access to the list, they simply place it into the list component’s mailbox and continue calculating the next number. Now we’ve moved the contention in the application strictly to the list component’s mailbox. Fortunately for us, a thread-safe queue can be implemented lock-free such that a single enqueue could translate to as little as a single interlocked operation. This significantly reduces the amount of time spent wasted while contending for resources, and the synchronization overhead (not including the cost of copying the message) typically impacts performance and scalability significantly less.

message_passing_blocking

Figure 5. Allocation of execution time for the example parallel algorithm using message passing.

Notice now how scalable our algorithm is. While there is still some contention, using an Interlocked operation has effectively reduced the amount of work that needs to be done atomically down to a single hardware instruction, enabling us to add many more cores to the algorithm while still reducing the total time to completion.

Fault Tolerance

Much of the web’s success is owed to its resiliency, in turn owed to isolation. In a shared-memory system, a rogue thread has the ability to corrupt state willy-nilly, potentially corrupting the entire process. Detecting, dealing with, and correcting this type of failure can be incredibly difficult if not impossible. In an isolated system where message-passing is used to update state, the most damage a component or thread can do is corrupt its own state. If a single component goes down, other components that depend on it may detect the failure via messages or timeouts and either reroute their dependencies or restart the component that failed.

Especially in the face of asynchronous operations, shared-memory parallel applications struggle with failure handling. Often times, exceptions are propagated up the stack of a thread that does not have enough context to make a proper decision when an exception arrives. With message-passing, failures can be easily routed to the components that know how to handle them best.

Loose-Coupling

With shared-state models it can be difficult to architect applications for reusability and maintainability. Not only are the dependencies complex and difficult to manage correctly, they also may impede a developer’s ability to cleanly separate components. With message-passing and isolation, much like the web, components interact with each other through formal, narrow channels with a very specific protocol. These channels are similar to what interfaces are to classes in the OO world. As long as a developer programs to the component’s interface and adheres to the protocol, the implementation can change with minimal impact on other components.

I don’t want to hand-wave and take credit for reaping the benefits of the web’s architecture on multi-core systems; none of these concepts are new – not even to Microsoft. Erlang is a language that has been around for years and has run many large incredibly scalable telco networks efficiently. MPI is arguably one of the most used technologies for message-passing in the highly concurrent distributed systems world. At Microsoft’s very own Robotics group, the CCR was developed to bring the powerful scalability, parallelism, and fault tolerance of message passing to robots. It’s been so successful there that it’s found its way into high-visibility parallel applications like MySpace. This applies to native code just as it does to managed: we’ve brought some of these concepts to the Parallel Pattern Library (PPL) in the form of Asynchronous Agents.

While each of these technologies has seen some degree of success they all lack a little something that makes parallel programming safe and productive. Erlang, for example, has a syntax that, while succinct, is difficult for the average developer and MPI, the CCR, and Asynchronous Agents lack the very thing that makes the web so beautifully safe: isolation.

What we need is a technology that wraps all of the concepts into a nice package – and we’re working on it. To get both safety and productivity we think a good way to go is with a language – whether that means a new language altogether or extensions to an existing one, we’re not quite sure yet. But in the interest of verifying that we can actually provide the value we claim we can, we’ve been incubating a language called Maestro, which you can learn all about on Channel 9 (also embedded below).

Get Microsoft Silverlight

Check back frequently – both here and on Channel 9 – to see what we’re working on in this space.

Posted: Friday, February 27, 2009 7:00 AM by phillips.joshua

Comments

phillips.joshua said:

Since we've moved the blog over from the Parallel Programming with .NET blog, I've copied all the comments (22) here:

-- Josh

--------------------------

Judah Himango said:  

Informative post, Joshua, thanks for this.

I'm interested to see where Maestro will go. Will it be like Spec#, where it starts out as a language but gets merged into a library available for all .NET languages?

Or will we have a future where we use Maestro in combination with our existing C# codebase?

Will be interesting to see where this goes. Keep the posts (and C9 vids) coming!

---

# February 12, 2009 12:28 AM

Dmitriy V'jukov said:  

Re: Fortunately for us, a thread-safe queue can be implemented lock-free such that a single enqueue could translate to as little as a single interlocked operation

Just one question: are you going to maintain per-agent queue sizes in Maestro?

---

# February 13, 2009 9:48 AM

phillips.joshua said:  

Judah,

Thanks for the questions.  As an incubation project, Maestro could end up anywhere: as it's own language, as extensions to an existing language, or even on the cutting room floor.  I'd recommend watching the Channel9 video.  We talk a bit about the language vs. library thing.  In fact, Maestro is really built on top of the CCR (a library for coordination and message-passing).  Maestro is really about making concurrent program easier by making it safer and we feel that language provides us with the best tools for this, i.e. static verification that you're not breaking the no-implicit-dependencies model.  

In it's current form, Maestro is a separate language that is object-aware but won't allow you to define classes.  As such, it's nearly impossible to do anything non-trivial without using C# or another language to define your objects.  Maestro is really about coordination -- we leave all the OO-stuff to the other languages.

Dmitriy,

I'm sorry, I'm not understanding the question. Can you explain what per-agent queue sizes are?  

Thanks!

---

# February 13, 2009 2:22 PM

dvyukov said:  

In message-passing system like Maestro inevitably must be message queues. Queues can be attached to agents/actors, or probably to Domains in Maestro. Basically, will these queues in Maestro contain 'GetLength()' member? Will it be possible to determine current queue length at run-time?

--

Dmitriy V'jukov

---

# February 14, 2009 6:09 AM

Luca Minudel said:  

The main design guideline to simplify programming of multi-threading applications is to decouple the business logic (applicative programming) from the code that manage threads, locking, synchronization, ... (system programming)

So you can and use commercial, off-the-shelf components for the multi-threading.

Have you in plan to deliver components that implement the most common multi-threading service models (say Thread-per-request, Thread-per-Session, Thread-Pool, ...) ?

---

# February 15, 2009 8:28 AM

progg.ru said:  

Thank you for submitting this cool story - Trackback from progg.ru

---

# February 16, 2009 6:12 AM

phillips.joshua said:  

Dmitriy,

Maestro does have a series of interaction points (message queues) that have a Count property on them.  In Maestro, you define channels which are a collection of ports that are transformed into these interaction points.  Of course, in a concurrent application, the size of these ports do not have much use as the information can become stale as soon as you retrieve it. Just out of curiousity, how would you use the queue count?

Luca,

Maestro is a language that abstracts threads into agents.  Agents, depending on their privileges, will either run serially or in parallel inside of a given domain.  As such, while Maestro doesn't have explicit constructs that implement different multi-threading service models, it does make it easy to support such models.  For example, say you had a service that simply communicated some sort of state and you wanted it to do so on a thread-per-request basis.  

In super-simplified pseudo-Maestro-code, that would look something like this.

domain MyService

{

 State _theState;

 MyService()

 {

    IHost httpHost = GetHost("http");

    httpHost.Host<StateGetterAgent>("http://myservice"">http://myservice");

 }

 channel StateGetterChannel

 {

    input GetState : State;

 }

 reader agent StateGetterAgent : StateGetterChannel

 {

    while ( StillAlive() )

    {

       // get request

       var request = receive(PrimaryChannel::GetState);

       // send response

       request::Reply <-- _theState

    }

 }

}

Everytime a client would access http://myservice, a new StateGetterAgent would be created inside the domain MyService.  Since StateGetterAgent is a reader agent, it would execute in parallel with all other reader agents. I hope that helps.

Thanks both for the great questions!

---

# February 16, 2009 12:48 PM

Dmitriy V'jukov said:  

Re: Just out of curiousity, how would you use the queue count?

I'm thinking about overload control. The idea is that it's impossible to create robust and efficient systems based on async message passing if there are no means to detect and control load at run-time.

Here is very demonstrative example:

http://blogtrader.net/page/dcaoyuan/entry/a_case_study_of_scalable

Count() method in queues provides a minimum needed to detect and control load - at least agent can check target agent's queue size and if it's too big spin while queue size will not decrease below low watermark.

What was caught my eyes is that you sad that enqueue operation is just one Interlocked operation. Are you using DWCAS (double-word CAS)? Or some more advanced techniques?

Btw, are you going to provide some built-in means for overload control? While Erlang provides ability to query process' queue size, it's neither efficient (user is unable to implement efficient load control mechanisms based only on Count() method), nor convenient (user will not implement any load control mechanisms until some bad things happen on client site). So my point is that async message passing library have to provide developed built-in means for load control. Here I've described some my thoughts on this (it's in Russian, but you can read only code - it's relatively self-explanatory):

http://groups.google.com/group/sobjectizer/tree/browse_frm/thread/b8b89bbd80cf8e7a/b6a0e1fdc1dcaff3

---

# February 16, 2009 2:59 PM

rednael said:  

Please, also read the following article:

http://blog.rednael.com/2009/02/05/ParallelProgrammingUsingTheParallelFramework.aspx

It's an article about basic parallel programming. Examples in C# .Net included. Also, it describes a lightweight parallel framework to work with tasks. Opposed to some other frameworks, this one is very light and very easy to use.

After reading this article, you should be able to write code using parallelism.

Regards,

Martijn

---

# February 16, 2009 4:27 PM

phillips.joshua said:  

Dmitriy,

Throttling is a bit of a double-edged sword.  It is indeed important to create a robust system but on the other hand, blocking a producer when a consumer cannot accept data may halt progress and can poison the responsiveness Maestro strives to maintain.  

As such, the ports defined on a channel can only be unbounded.  However, as you'll hear about in the Channel 9 video, Maestro is built on top of a runtime that is in fact a modified version of the CCR.  This runtime contains a number of interaction points that each behave a little differently.  In fact, when you define a port on a channel, its transformed into a OrderedInteractionPoint<T> that is unbuffered.  There is also an UnorderedInteractionPoint<T> a SingleItemInteractionPoint<T> and a WriteOnceInteractionPoint<T>.  Since all of these are both sources and targets, you can send and receive to them just as you would a standard Maestro port. Additionally, the runtime also includes an interface (IInteractionPoint<T>) that you can use to build your own interaction point, including a bounded interaction point that supports throttling.  

So while we don't have a blocking queue built in, throttling is supported via some other patterns, such as SingleItemInteractionPoint<T> and  it will be very easy to roll your own BoundedInteractionPoint<T> for throttling producers.

---

# February 17, 2009 1:47 PM

Dmitriy V'jukov said:  

Application that is swapped out to disk because of the memory pressure or already dead can't be very responsive too :)

And the problem here is that it's basically impossible to eliminate overloads "by design", asynchronous message-passing systems are just suspected to overloads.

Replacing unbounded queues with bounded blocking queues... well, I think I will not do this until I will want to show to someone what is a deadlock in a message-passing system :) Is queue interface general enough to allow developer to capture whole graph of blocked agents? Even if so, I will have periodically analyze the graph for cycles and unblock some agents... I don't think that it is a good advice for application developer...

What I am talking about is a much more flexible mechanism. Agent can choose one of the predefined policies: suspend sender, drop message, redirect message, enqueue message regardless of load. Also the sender can affect policy by setting "dont drop my messages" or "don't suspend me on overload" options. Default overload criterion is based on the high and low watermarks (can be set globally or on a per-agent basis). If default policies are not fit, developer is free to implement overload control policy manually, but still take advantage of automatic deadlock prevention mechanism, etc. System provides several places to hook in; for example, in the case of overload agent can periodically scan own queue and drop low-priority or outdated messages in order to release memory pressure.

You've mentioned web/internet as an example of async system, and network systems do have automatic overload detection and propagation. Sockets just become non-writable, thus overload conditions propagate through the system.

---

# February 17, 2009 6:52 PM

phillips.joshua said:  

Isn't allowing a developer to implement an agent's message-queue how they see fit the most flexible option?

Or is your feedback more that it's inconvenient to have to do so?  Again, some of these patterns are already supported by constructs that we have: for example, "enque message regardless of load" is the default, you can drop newer messages using a SingleItemInteractionPoint<T>, you can redirect message using networks.. The beauty of the interface is, you have the flexibility to implement a pattern that best supports your scenario.  For example, some applications may want to drop a message and log the drop, some may not.  Some applications may want to suspend the sender indefinitely, some may want to suspend the sender for only a small period.  Providing the interface allows the implementer to choose what's best for the situation at hand.

Most importantly, we feel pretty stongly that these semantics should not be surfaced in the language, they should only be exposed in the library to keep the language simple.  Also, to maintain loose coupling, we have to keep the contract between two agents very simple, i.e. the direction of the data, the type of the data, and the protocol of the communication.  Just like in the web, a client doesn't care how a service deals with load, just that it does.  

---

# February 18, 2009 4:44 PM

Niklas Gustafsson said:  

Another way of saying the same thing as Josh here, is that we want throttling behavior such as the various options Dimitriy describes to be a transport-layer decision, not baked into the language. In the most simple (and efficient) transport, i.e. the default in-process transport, there is no throttling.

Anyone can extend the underlying communication mechanism by writing a communication provider; for example, we have one that uses WCF for inter-process communication between Maestro agents. WCF has all kinds of support for configuration of behavior, and we think that's the layer such things belong at. At least, that's our current thinking.

---

# February 18, 2009 4:57 PM

Concurrently Speaking said:  

I had expected that the first word on Maestro would come on this blog, but that's what happens when you

---

# February 20, 2009 11:54 PM

dvyukov said:  

Josh, Niklas,

I think it's perfectly Ok to push overload control to transport layer (anyway it's impossible to solve all the problems simultaneously).

Nevertheless, it's still unclear to me whether interfaces will be flexible enough to support implementation of various overload control strategies. And I am talking NOT about implementation of queue, I am talking about system-wide overload control. For example, if I want to block/suspend a sender, will I be able to implement dead-lock prevention mechanism? Or will I be able to notify the sender that his message was dropped? Or will I be able to just notify the sender that he causes overload, so that he will be able to switch to special working mode? The latter case also requires detection of the fact that overload has dispersed, in order to notify senders, so that they will be able to switch back to normal working mode.

In general, these strategies require to be able to capture not only separate message, but the whole context - {message, sender, receiver, probably something else}. Is queue interface support this?

---

# February 22, 2009 10:31 AM

Programming news: HTML 5&#8217;s Working Draft release, Android development, and more | Programming and Development | TechRepublic.com said:  

PingBack from http://blogs.techrepublic.com.com/programming-and-development/?p=891

---

# February 23, 2009 8:22 AM

phillips.joshua said:  

Hi Dimitry,

Using just the interaction point you most certainly have references to the message, sender, and receiver.  What's the something else?

---

# February 23, 2009 1:51 PM

Concurrently Speaking : Isolation in Maestro said:  

PingBack from http://blogs.msdn.com/concurrently_speaking/archive/2009/02/23/maestro-and-isolation.aspx

---

# February 23, 2009 2:04 PM

dvyukov said:  

Josh,

Thank you. Ok, I get the whole picture. I don't know that is something else yet :)

---

# February 24, 2009 1:26 AM

BajzT said:  

[1] Is it possible for messages within Maestro to survive a process failure/restart/crash since this is not currently supported in the CCR?

[2] If messages are persisted what is the persistence store (SQL Server, distributed memory cache etc)?

---

# February 25, 2009 7:41 AM

phillips.joshua said:  

BajzT,

Great question.  This is somewhat related to Dmitriy's throttling issue.  To perform as well as possible, Maestro has a relatively basic message passing system.  In shared-memory applications where failure is less likely, we want Maestro to be blazing fast, and persisting messages would deteriorate performance, perhaps unneccesarily for most applications.  Also your second question highlights another good reason not to store messages by default: what would be the best way to store them?  

As with Dmitriy's throttling problem, this can be solved by creating your own interaction point in Maestro (exactly as it can be solved in the CCR).  

Also, keep in mind that just because a feature won't be in the language, doesn't mean we won't support it or even provide it.  We're intentionally keeping the language as light as possible but we've yet to start really building up a framework around Maestro.  No promises, but you might just see a BoundedInteractionPoint<T> or a PersistentInteractionPoint<T> in the future. :)

---

# February 25, 2009 11:46 AM

BajzT said:  

Joshua,

Persistence providers in the Maestro framework for SQL Server, Service Broker, MS Velocity, MSMQ etc would allow people to choose the right store for their problem domain. In my scenario coordination and state for example are more important than pure message throughput.  

My next question might be a little premature but it seems it will be possible to run agents on multiple servers to scale out, load balance and failover message processing?

# February 27, 2009 11:56 AM

Niklas Gustafsson said:

BajzT,

WCF supports persistent messaging via MSMQ, and we will support WCF as one of the transports used for inter-process messaging in Maestro. Nothing (except performance constraints you may have) will prevent you from using WCF for in-process communication, too, so that should just work.

I think that also answers your follow-on question about scale-out. It is definitely the intent of the model to provide one programming paradigm across local and remote scenarios.

One thing that we haven't decided on is whether to build failover and load-balancing into the language or into the frameworks it relies on. I would expect that we will decide to go with the latter.

# March 2, 2009 11:56 AM

Matthew Podwysocki's Blog said:

As you may have noticed from my blog, there is a big interest in concurrency. Along with that, I’ve made

# March 2, 2009 5:12 PM

BajzT said:

Niklas,

Would it be possible to outline the current thinking on how transactions will be handled across local and remote scenarios?

# March 3, 2009 7:15 PM
Leave a Comment

(required) 

(required) 

(optional)

(required) 

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Page view tracker