Im vorhergehenden Artikel dieser Artikelreihe habe ich Ihnen gezeigt wie Sie mittels Parallel.For und Parallel.ForEach Schleifen erstellen können, deren Iterationsschritte parallel verarbeitet werden. In diesem Artikel möchte ich daran anknüpfen und Ihnen zeigen wie die Fehlerbehandlung und die Threadsicherheit bei parallelen Schleifen umgesetzt werden kann.
Schleifen mit Index
Manchmal ist es notwendig den Index einer Iteration zu kennen. Zu diesem Zweck wird in der parallelen ForEach Schleife eine Aktion mit drei Parametern verwendet. Der dritte Parameter gibt den Schleifenindex wieder. Der nachfolgende Quellcode zeigt ein entsprechendes Beispiel.
1 static void Main(string[] args)
2 {
3 List<string> values = new List<string>() { "A", "B", "C", "D", "E", "F" };
4
5 Parallel.ForEach(
6 values,
7 (value, loopState, index) => DoWork(value, loopState, index));
8
9 Console.ReadKey();
10 }
11
12 static void DoWork(string value, ParallelLoopState loopState, long index)
13 {
14 Console.Write(index + ":" + value + " ");
15 }
Iterationsübergreifende Aktionen
Im vorhergehenden Teil dieser Artikelserie wurde nur der Fall betrachtet dass in einer Schleife jeweils voneinander unabhängige Vorgänge durchgeführt werden. Wie aber lassen sich threadsichere iterationsübergreifende Aktionen realisieren? Betrachten wir ein kleines Beispiel um diese Frage zu beantworten: Es soll die Summe aus den Zahlen 1 bis 1000000 gebildet werden.
Die sequentielle Schleife für diese Funktionalität ist schnell erstellt:
1 static void Main(string[] args)
2 {
3 double sum = 0;
4
5 for (int i = 1; i <= 1000000; i++)
6 {
7 sum = sum + i;
8 }
9
10 Console.WriteLine(sum);
11
12 Console.ReadKey();
13 }
Als Summe wird 500000500000 ausgegeben.
Wenn die Summenbildung in einer parallelen Schleife implementiert wird, dann muss der Zugriff auf die Summenvariable threadsicher erfolgen, beispielsweise mittels lock oder Monitor.
1 static void Main(string[] args)
2 {
3 double sum = 0;
4 object locker = new object();
5
6 Parallel.For(1, 1000001, i =>
7 {
8 lock (locker)
9 {
10 sum = sum + i;
11 }
12 });
13
14 Console.WriteLine(sum);
15
16 Console.ReadKey();
17 }
Der Nachteil dieser Lösung ist, dass eine Millionen Mal gelockt werden muss. Dies ist aber eigentlich nicht nötig da nicht so viele parallele Threads erzeugt werden, ausser Sie haben einen Rechner mit einer Millionen Prozessorkernen. Durch das Framework wird bei der Ausführung der parallelen Schleife nur ein Bruchteil dieser Millionen Threads erzeugt. Jeder dieser Threads prozessiert dabei mehrere Schleifeniterationen. Somit muss nur das Zusammenführen der Teilergebnisse threadsicher ausgeführt werden.
Die parallele Schleife unterstützt diesen Aspekt. Der Schleife müssen dabei folgende Werte übergeben werden:
- Startwert
- Endwert
- Initialisierung der lokalen Variable für einen Thread
- Aktion die im Thread je Iteration ausgelöst wird
- Aktion welche am Ende des Threads und damit threadübergreifend ausgelöst wird
Der folgende Quellcode zeigt ein entsprechendes Beispiel für die parallele Summenbildung.
1 static void Main(string[] args)
2 {
3 double sum = 0;
4 object locker = new object();
5
6 Parallel.For(
7 1,
8 1000001,
9 () => 0.0,
10 (i, loopState, localSum) => localSum + i,
11 localSum =>
12 {
13 lock (locker)
14 {
15 sum = sum + localSum;
16 }
17 });
18
19 Console.WriteLine(sum);
20
21 Console.ReadKey();
22 }
In Zeile 9 wird die lokale Variable für den Thread initialisiert. In Zeile 10 wird die lokale Funktion aufgerufen. In dieser erfolgt die lokale Summenbildung. Diese wird innerhalb des Threads ausgeführt und muss daher nicht durch Lock-Mechanismen geschützt werden. In Zeile 11 bis 17 wird die globale Funktion aufgerufen. In dieser wird übergreifend über alle Threads die Gesamtsumme aus den Teilsummen gebildet. Hierbei muss ein entsprechendes lock erfolgen.
Dieses etwas komplexe Konstrukt für parallele Schleifen zeigt wie ein threadübergreifendes Zusammenspiel der einzelnen Schleifeniterationen realisiert werden kann. Für solch einfache Aufgabenstellungen wie beispielsweise die hier gezeigte Summenbildung gibt es aber bessere Alternativen. Die Summenbildung lässt sich mittels PLINQ (parallel LINQ) viel eleganter lösen. Der folgende Quellcode zeigt die entsprechend angepasste Anwendung.
1 static void Main(string[] args)
2 {
3 double sum = 0;
4
5 sum = ParallelEnumerable.Range(1, 10000000).Sum(i => (double)i);
6
7 Console.WriteLine(sum);
8
9 Console.ReadKey();
10 }
Exception Handling
Enthält eine Anwendung parallel laufende Threads, dann stellt sich immer die Frage: Was geschieht wenn in den Threads Exceptions geworfen werden? Bei den parallelen Schleifen ist diese Fragestellung daher ebenfalls relevant.
Im Fall der parallelen Schleifen wird dem Entwickler bereits viel Arbeit abgenommen und ihm ein komfortabler Umgang mit Exceptions geboten. Alle Exceptions der einzelnen Schleifeniterationen werden gefangen und gemeinsam als AggregateException geworfen. Die parallele Schleife kann somit in einem try-catch Block diese AggregateException abfangen und die Liste der gesammelten Fehler auswerten. Das nachfolgende Beispiel zeigt wie zwei verschiedene Fehler geworfen und gefangen werden.
1 static void Main(string[] args)
2 {
3 try
4 {
5 Parallel.For(1, 10, i => DoWork(i));
6 }
7 catch (AggregateException exc)
8 {
9 Console.WriteLine("Main Exception: " +
10 exc.GetType().ToString());
11
12 foreach (Exception innerException in exc.InnerExceptions)
13 {
14 Console.WriteLine("Inner Exception: " +
15 innerException.GetType().ToString());
16 }
17 }
18
19 Console.ReadKey();
20 }
21
22 static void DoWork(int i)
23 {
24 if (i == 5)
25 {
26 throw new NotSupportedException();
27 }
28
29 if (i == 7)
30 {
31 throw new NotImplementedException();
32 }
33 }
In der AggregateException sind alle Ausnahmen gesammelt. Einfach all diese Exeptions abzufangen und gleich zu behandeln gehört aber mit Sicherheit nicht zum guten Programmierstyl. Vielmehr sollten nur die erwarteten Ausnahmen explizit abgefangen und behandelt werden. Die Klasse AggregateException bietet dafür die Handle Funktion an. Dabei wird jede enthaltene Exception der angegebenen Prädikatfunktion übergeben und kann in dieser gesondert behandelt werden. Die Prädikatfunktion gibt true oder false zurück, je nachdem ob die Ausnahme behandelt wurde oder nicht. Alle weiterhin unbehandelten Ausnahmen werden in einer neuen AggregatException nochmals geworfen. Das folgende Beispiel zeigt dieses Vorgehen. Dabei wird nur die NotSupportedException behandelt. Somit wird die NotImplementedException erneut innerhalb einer zweiten AggreageException geworfen. Um dies zu visualisieren wurde im Beispiel ein zweiter try-catch Block verwendet.
1 static void Main(string[] args)
2 {
3 try
4 {
5 try
6 {
7 Parallel.For(1, 10, i => DoWork(i));
8 }
9 catch (AggregateException exc)
10 {
11 exc.Handle(exception =>
12 {
13 if (exception is NotSupportedException)
14 {
15 Console.WriteLine("Handled: " +
16 exc.GetType().ToString());
17
18 //set exception as handled
19 return true;
20 }
21
22 //rethrow all other exceptions
23 return false;
24 });
25 }
26 }
27 catch (AggregateException exc)
28 {
29 foreach (Exception innerException in exc.InnerExceptions)
30 {
31 Console.WriteLine("Unhandled: " +
32 innerException.GetType().ToString());
33 }
34 }
35
36 Console.ReadKey();
37 }
38
39 static void DoWork(int i)
40 {
41 if (i == 5)
42 {
43 throw new NotSupportedException();
44 }
45
46 if (i == 7)
47 {
48 throw new NotImplementedException();
49 }
50 }
Fazit
Die Fehlerbehandlung in parallelen Schleifen wird sehr gut unterstützt. Die Fehler der in parallelen Threads ausgeführten Iterationsschritte werden in einer gemeinsamen AggregateException gesammelt und lassen sich leicht auswerten.
Des Weiteren unterstützten die parallelen Schleifen eine effiziente threadsichere Benutzung gemeinsamer Ressourcen. Das hier vorgestellte Konstrukt mit threadeigenen und threadübergreifenden Funktionen wird aber in der Praxis meist durch wesentlich besser lesbare PLINQ Anweisungen ersetzt.
if you choose to cerate two separate loops there is one pitfall you have to watch out for: take for example the number 15. It is divisible by both 3 and 5. for that reason it will be included in both loops which means your result will have duplicate numbers added to it like 15; 30; 45; The way to avoid this while still using two loops makes the whole method longer. And so one loop that checks both at the same time is the best