With .Net 4.0 we have the "Task" object in the Task Parallel Library (TPL), which is improved in .Net 4.5, and in C# 5.0 we get the "async/await" technology. We will start with the old-fashioned "Thread" class and move to "Task" and "async/await".
- The Parallel Class
This is a higher level of abstraction. The number of background tasks are "chunked" into small sets.
using System.Threading.Tasks; //to get the Parallel class
Parallel.For(0,max, (i) => { Calculate(i); });
Parallel.ForEach(list,(item) => { Calculate(item); });
Parallel.Invoke(
() => myTask1(),
() => myTask2(),
() => myTask3(),
() => myTask4()
);
Exceptions thrown by For, ForEach, and Invoke are saved until all tasks complete and then are thrown as an AggregateException.
- ParallelLoopResult
ParallelLoopState allows you to stop a loop.
using System;
using System.Collections.Specialized;
using System.Security.Cryptography;
using System.Threading.Tasks;
namespace Practice
{
class TasksArgs
{
private static int n = 100;
static void Main()
{
ParallelLoopResult parallelLoopResult = Parallel.For(0, n,
(int i, ParallelLoopState loopControl) =>
{
if (i > n - 2)
{
loopControl.Stop();
}
else
{
Console.WriteLine("working on " + i);
}
});
if (!parallelLoopResult.IsCompleted)
{
Console.Out.WriteLine("Problem with parallel loop.");
}
Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
}
}
}
LoopControl.Stop() halts execution as fast as possible. LoopControl.Break() halts after all the earlier tasks have completed.
- Using "Thread" and "Thread.start()"
- Classic Style
Simple multi-threaded example in c#.
using System;
using System.Threading;
namespace Practice
{
class Threads1
{
static void Main()
{
Thread t = new Thread(Cow);
t.Start();
for (int i = 0; i < 10; i++)
{
Console.Out.Write("Baa");Thread.Sleep(200);
}
Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
}
public static void Cow()
{
for (int i = 0; i < 10; i++)
{
Console.Out.Write("Moo"); Thread.Sleep(200);
}
}
}
}
The output will not necessarily be the same; this time it produces:
BaaBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaMooBaaBaaMoo
- Using Lambda functions
You can use lambda functions to define the task for a thread.
using System;
using System.Threading;
namespace Practice
{
public class Thread3
{
public static void Main(string[] args)
{
Thread pig = new Thread(() => Console.Write("Oink"));
Thread cat = new Thread(() => Console.Write("Meow"));
pig.Start();
cat.Start();
}
}
}
- Concurrency Issues
- The Problem
When two threads change the same data bad things can happen. When the result of an operation depends on the timing of different threads it is called a "Race Condition".
A race condition can be solved by the use of locks to protect shared code, but this inhibits parallelism but is simple to implement. Race conditions can be solved without the use of locks, but this requires more thought.
An object is said to be "thread-safe" if it can be used in parallel and not have race conditions. For example, "Console.Out" and "ConcurrentQueue" are thread-safe, but "List" is not. Read the object's documentation to determine if it's thread-safe.
Concurrent Data Collections: ConcurrentBag<T>,ConcurrentQueue<T>, ConcurrentStack<T>, ConcurrentDictionary<T>, BlockingCollection<T>.
To create a task takes time and memory. A rule of thumb is that a task to be profitable needs to consume 300 cycles of the CPU.
By default tasks will run in random order. To guarantee tasks start in creation order use the "TaskCreationOptions.Fairness" option in Task.Factory.StartNew()>
For long running tasks use the "TaskCreationOptions.LongRunning" option so the task is created on a non-worker pool thread.
In the following example two threads increment and then decrement a global variable. In C# incrementing and decrementing are not necessarily atomic and creates a race condition.
using System;
using System.Threading;
public class ThreadTest2 {
public class MyJob {
public static int counter = 0;
public int repetitions=100000000;
public void runme() {
for(int i=0;i<repetitions;i++) {
counter++;
counter--;
}
Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
}
}
public static void Main(string[] args) {
MyJob myJob = new MyJob();
Thread thread = new Thread(myJob.runme);
thread.Name = "first";
MyJob myJob2 = new MyJob();
Thread thread2 = new Thread(myJob2.runme);
thread2.Name = "second";
thread.Start();
thread2.Start();
}
}
The results can be:
first: 0
second: 0
or sometimes
first: 1
second: 0
We can never be sure of the outcome. We can overcome this in several ways.
- A Solution is to use the "lock()" construct
lock(Object object) takes an object as the argument. In this example we can use "this" since both threads are using the same "MyJob" instance. In static functions you can use the "type" object, e.g., "lock(typeof(Util))" to synchronize.
public class MyJob {
public static int counter = 0;
public int repetitions=100000000;
public void runme() {
lock(this) {
for(int i=0;i<repetitions;i++) {
counter++;
counter--;
}
}
Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
}
}
- Hardware Locking
Using Threading.Interlocked.Increment the CLR will guarantee an atomic operation in hardware (if possible). Another interesting method in the namespace is "Exchange()" which swaps two values atomically.
public class MyJob {
public static int counter = 0;
public int repetitions=100000000;
public void runme() {
for(int i=0;i<repetitions;i++) {
System.Threading.Interlocked.Increment(ref counter);
System.Threading.Interlocked.Decrement(ref counter);
}
Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
}
}
The results will always be:
first: 0
second: 0
Hardware interlocking should be faster. While testing this you may need to have the cached memory on your system flushed. To do this use a utility from Sysinternals called "RamMap" and empty the standby list.
- The Synchronization attribute can be used.
This locks the entire object - everything is single access. For performance reasons it is better to just lock the "critical section", the smallest section of code that causes an issue. Note the object must descend from ContextBoundObject.
[System.Runtime.Remoting.Contexts.Synchronization]
public class MyJob: ContextBoundObject {
public static int counter = 0;
public int repetitions=100000000;
public void runme() {
for(int i=0;i<repetitions;i++) {
counter++;
counter--;
}
Console.WriteLine(Thread.CurrentThread.Name+": "+counter);
}
}
- Example of using Join()
Join() hitches the fate of a thread to the current thread. Execution of the calling thread will wait until the callee's process completes. In this example we Join on a thread that takes 2 seconds to complete, then Join on a second that still has 2 seconds to go.
using System;
using System.Threading;
//Example showing use of Join()
public class ThreadTest4 {
public class MyJob {
public static int counter = 0;
public int waitfor=1;
public int finalState = -1;
//sleeps then fills in finalState to simulate work done
public void runme() {
Thread.Sleep(waitfor);
finalState = waitfor;
Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
}
}
public static void Main(string[] args) {
MyJob myJob = new MyJob();
myJob.waitfor = 2000;
Thread thread = new Thread(myJob.runme);
thread.Name = "first";
MyJob myJob2 = new MyJob();
myJob2.waitfor = 4000;
Thread thread2 = new Thread(myJob2.runme);
thread2.Name = "second";
thread.Start();
thread2.Start();
Console.WriteLine("After start.");
Console.WriteLine("Before first join.");
thread.Join();
Console.WriteLine("After first join.");
Console.WriteLine("Before second join.");
thread2.Join();
Console.WriteLine("After second join.");
Console.WriteLine("myJob.finalState="+myJob.finalState);
Console.WriteLine("myJob2.finalState="+myJob2.finalState);
}
}
This produces
After start.
Before first join.
first finished sleeping finalState: 2000
After first join.
Before second join.
second finished sleeping finalState: 4000
After second join.
myJob.finalState=2000
myJob2.finalState=4000
- Join() with timeout
What if a process may never finish? The Join() method has an optional parameter specifying how many millisecs to wait. The Join will give up after that time and return a 'false'. The following example shows the main thread waiting 2 seconds for a job that will take 8 seconds. After waiting 2 seconds, it gets a 'false' value back implying it did not finish in 2 seconds. Lacking any mercy we waste the process and move on to wait for the second, which has already completed.
using System;
using System.Threading;
//Example showing use of Join()
public class ThreadTest6 {
public class MyJob {
public static int counter = 0;
public int waitfor=1;
public int finalState = -1;
//sleeps then fills in finalState to simulate work done
public void runme() {
Thread.Sleep(waitfor);
finalState = waitfor;
Console.WriteLine(Thread.CurrentThread.Name+" finished sleeping finalState: "+finalState);
}
}
public static void Main(string[] args) {
MyJob myJob = new MyJob();
myJob.waitfor = 8000;
Thread thread = new Thread(myJob.runme);
thread.Name = "first";
MyJob myJob2 = new MyJob();
myJob2.waitfor = 500;
Thread thread2 = new Thread(myJob2.runme);
thread2.Name = "second";
thread.Start();
thread2.Start();
Console.WriteLine("After start.");
Console.WriteLine("Before first join.");
bool finished = thread.Join(2000);
if(!finished) { thread.Abort(); }
Console.WriteLine("finishedP:"+finished);
Console.WriteLine("After first join.");
Console.WriteLine("Before second join.");
thread2.Join(2000);
Console.WriteLine("After second join.");
Console.WriteLine("myJob.finalState="+myJob.finalState);
Console.WriteLine("myJob2.finalState="+myJob2.finalState);
}
}
Which produces:
After start.
Before first join.
second finished sleeping finalState: 500
finishedP:False
After first join.
Before second join.
After second join.
myJob.finalState=-1
myJob2.finalState=500
- Using the "Task" object to create asynchronous threads.
In .Net 4.0 the "Task" object was introduced in "System.Threading.Tasks" with the Task Parallel Library (TPL). This puts syntactic sugar over creating Threading objects. In .Net 4.5 "Task.Run()" was created to make things even easier since they auto-start.
A Task allows us to check on status, wait, grab the results when it's done and look at exceptions thrown.
- Waiting
Task task1 = Task.Factory.StartNew( lambda1 );
Task task2 = Task.Factory.StartNew( lambda2 );
Task[] tasks = new Task[] {task1, task2};
...
task1.Wait();//waits for a single task
Task.WaitAll(tasks); //waits for all tasks
...
int index = Task.WaitAny(tasks); //waits until one finishes
Console.WriteLine(task.Status);//one of RanToCompletion, Canceled, or Faulted
task.Result calls implicit Wait().
- Composing tasks
task2 can be scheduled to run after task1 completes by using "ContinueWith".
Task task1 = Task.Factory.StartNew( lambda1 );
Task task2 = task1.ContinueWith( (antecedent) =>
lambda2 /* which can access antecedent.Result*/);
Task.Factory.ContinueWhenAll(tasks, (tasks) =>
{ ... //code to run when all tasks have finished }
Task.Factory.ContinueWhenAny(tasks, (firstTask) =>
{ ... //code to run when one task has finished }
After starting a thread to run, control will return to the main thread until a "task.Wait()" or a reference to "task.Result" is reached.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Practice
{
class Tasks
{
static void Main()
{
string favoriteAnimal = "Artic Fox";
Task task = Task.Run(() =>
{
Thread.Sleep(2);
favoriteAnimal = "Owl";
});
Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Artic Fox
task.Wait(5000);//wait for task to complete or 5000 ms timeout
Console.Out.WriteLine("favoriteAnimal = {0}", favoriteAnimal);//Owl
Console.Write("Press 'Enter' to exit.");Console.In.ReadLine();
}
}
}
- Exceptions
When a task throws an exception the task is terminated, the exception is saved in an AggregateException and stored in the task's Exception property. The aggregate exception is thrown later when the task's Wait(), Result(), or WaitAll() are called and the original exception is in the "inner" exception of the aggregate exception.
The exceptions may be deeply nested, but can be flatten by using the "Flatten()" method on an AggreagateException.
If you do not handle (or "observe" exceptions as Microsoft likes to call this) a task's exceptions, the exception will be thrown during garbage collection - not an ideal time.
To observe the exception do one of the following:
- invoke ".Wait()" or access ".Result"
- invoke "Task.WaitAll()"
- examine the ".Exception" property
- subscribe to "TaskScheduler.UnobservedTaskException". Useful when using 3rd party TPL libraries. Be sure to call exception.SetObserved() inside the handler.
TaskScheduler.UnobservedTaskException +=
new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e) {
e.SetObserved();
}
- Cancelling a Task
Cancelling a task requires cooperative between the calling code and the task itself.
var cancellationTokenSource = new CancellationTokenSource();
var token = cancellationTokenSource.Token;
Task t = Task.Factory.StartNew( () =>
{
while(...) {
if(token.IsCancellationRequested) {
... //cleanup
token.ThrowIfCancellationRequested();//throws OperationCancelledException
}
...//do work
}
}, token);
//in main program
if(need to cancel) cancellationTokenSource.Cancel();
- Passing Data to Tasks
Task.Factory.StartNew() allows us to pass in an object parameter after the definition of the lambda.
Task t = Task.Factory.StartNew( (arg) => {
Console.WriteLine((string)arg);
},"apples");
- Returning a value
With generic tasks you can return a value of type T
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Practice
{
public class Thread4
{
public static void Main(string[] args)
{
Task<string> task = Task.Run(() =>
{
Thread.Sleep(300);
return "stew";
});
string result = task.Result;
Console.Out.WriteLine("result = {0}", result);
Console.Write("Press 'Enter' to exit.");
Console.In.ReadLine();
}
}
}
- Awaiters and Continuations
It's common to have something you want to execute after a task is finished. This is called a 'continuation' method. We can run a continuation right after a task finishes by getting the "Awaiter" object from a task and assigning a delegate to that object. The "Awaiter" object will then tell the task that when it is finished to run this delegate.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Practice
{
public class Threads5
{
public static void Main(string[] args)
{
Task<int> task = Task.Run(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(2));
return 17;
});
var awaiter = task.GetAwaiter();
awaiter.OnCompleted(() =>
{
int result = awaiter.GetResult();
Console.Out.WriteLine("result = {0}", result);
});
Console.Write("Press 'Enter' to exit.");
Console.In.ReadLine();
}
}
}
A common scenario is to have the user interface update after a compute intensive task finishes, but only the thread that created a UI component can update that component, so you have to run a continuation task on the main thread. This is done by specifying a context to the thread with "TaskScheduler.FromCurrentSynchronizationContext()".
To launch tasks more efficiently use "Task.Factory.StartNew( code );" instead of "Task.Run(code);"
- General Notes:
When launching many long-running tasks, i.e., much more than the number of processors, the default behavior of the TPL is to assume each task only takes a few seconds to complete, so the TPL will slowly flood the system with tasks and thrashing will ensue.
To implement the Producer-Consumer pattern, use a "BlockingCollection".
To get the number of processors to set the number of consumers use "System.Environment.Processorcount".
Parallel LINQ (plinq) can be easy to use by just adding ".AsParallel()" after the source, "(from employee in elist.AsParallel() select employee.pay).Sum();". Plinq only works on in-memory data.
Threads have one of the following priorities:
enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }
Threads can have a status of "foreground" (the default) or "background". If all other threads have completed, a "foreground" task will keep an application running, while a "background" task will not. The status does not effect priority.
The Asynchronous Programming Model (APM) has a "Begin" to initiate and an "End" to finish.
-
Async / await Example from my notes on C# 5.0.
Before the C# 5.0, we had to write asynchronous methods with callbacks and be careful about error conditions. With "async" and "await" it's much more straight forward.
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace AsyncHttpClient
{
/// <summary>
/// Tiny example of using HttpClient.GetAsync() with Generics.
/// Uses the REST API calls from the most excellent mysafeinfo.com for presidents and Beatles albums
/// This prints:
/// Requesting presidents
/// Requesting Beatles albums
/// ... waiting ...
/// first president = number: '1','', party: '', term: ''
/// first Beatles album = album name: 'Please Please Me (Mono)', date: '1963-03-22T00:00:00'
/// </summary>
public class President
{
public int president;
public string name, party, term;
public override string ToString() { return $"number: '{president}','{name}', party: '{party}', term: '{term}'"; }
}
public class BeatlesAlbum
{
public string album, date;
public override string ToString() { return $"album name: '{album}', date: '{date}'"; }
}
class AsyncHttpClientExample
{
private static void Main()
{
string presidentsUrl = "https://mysafeinfo.com/api/data?list=presidents&format=json";
string beatlesUrl = "https://mysafeinfo.com/api/data?list=beatlesalbums&format=json&select=ent,typ,rd&alias=ent=artist,typ=album,rd=date";
var asyncHttpClientExample = new AsyncHttpClientExample();
Console.Out.WriteLine("Requesting presidents");
var presidents = asyncHttpClientExample.GetAsync<List<President>>(presidentsUrl);
Console.Out.WriteLine("Requesting Beatles albums");
var albums = asyncHttpClientExample.GetAsync<List<BeatlesAlbum>>(beatlesUrl);
Console.Out.WriteLine("... waiting ...");
Console.Out.WriteLine("first president = {0}", presidents.Result[0]);
Console.Out.WriteLine("first Beatles album = {0}", albums.Result[0]);
}
private async Task<T> GetAsync<T>(string url)
{
HttpClient client = new HttpClient(new HttpClientHandler());
HttpResponseMessage response = await client.GetAsync(url).ConfigureAwait(false);
var jsonString = response.Content.ReadAsStringAsync().Result;
T result = JsonConvert.DeserializeObject<T>(jsonString);
return result;
}
}
}