Java: Threadkommunikation

Multithreading. Was ist das, und wann brauche ich das? Viel wichtiger, wie wende ich es an?
Multithreading, früher vielleicht noch exotisch, heutzutage ein muss, wenn mann die zur Verfügung stehende Leistung einer CPU voll ausreizen möchte.
Die Nebenläufigkeit von Programmen bedeutet, dass mehrere Vorgänge gleichzeitig ablaufen. Ich brauche dazu zwar nicht mehrere CPU Kerne zu haben, habe ich mehr Threads am Laufen, als Kerne verfügbar, wird es relativ ineffizient, da der Scheduler den Kontext wechseln muss, aber das ist eine andere Geschichte.

Netzwerkprogrammierung ist ein gutes Beispiel für ein nebenläufiges Programm.
socket.receive ist nämlich blocking. Sprich, das Programm wartet auf den Empfang von Paketen. Für immer. Bis etwas da ist. Es geht währenddessen nichts anderes mehr. Solch ein Programm kann man direkt wegwerfen, eine non-blocking-io (z.B. Java NIO) Library verwendet, oder das einfach in einen neuen Thread auslagern:

new Thread(Runnable);

(new Thread(new Runnable(){
    
    @override
    public void run() {
        // etwas tun im Thread
    }

})).start();

// Lambda, wers eilig hat: (Java >=8)

(new Thread(() -> { doSomething(); } )).start();

Run Methode überschreiben, while(true) rein und dort das socket.receive ausführen.
Nächstes Problem: Die Daten möchten in irgendeiner Art und Weise verarbeitet werden, das dauert, jetzt mal übertrieben, 5 Minuten.
Programm läuft zwar weiter, denn die Verarbeitung erfolgt auf dem Netzwerk-Thread. Nur möchten eventuell noch andere Clients auch Anfragen machen und Daten abliefern. Timeout. Es kommt nichts mehr an.
Die Lösung: Auch hier wird nach socket.receive ein neuer Thread mit obiger Methode erstellt, gestartet. In der run Methode, die auf dem neuen Thread ausgeführt wird, wird z.B. eine Methode von irgendeinem anderen Objekt aufgerufen, dass für die Verarbeitung zuständig ist. Mehr muss ja nicht passieren, Funktionsaufruf, es läuft alles auf dem neuen Thread, bis zum letzten Return.
Ein paar hundert Anfragen die Sekunde bedeutet, ein paar hundert Threads werden pro Sekunde erstellt. Das ist ohne Zweifel ein unnötiger Overhead. Ganz zu schweigen von fehlender Kontrolle, wie viel auf einmal parallel ausgeführt wird.

Diesem Szenario stand ich vor 2 Monaten gegenüber.

import java.util.concurrent.ConcurrentLinkedQueue;

public class WorkerThread extends Thread {

    private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
    private boolean run = true;
    private String name;

    public WorkerThread(String threadName) {
        name = threadName;
        this.setName(name);
        this.start();
    }

    @Override
    public void run() {
        Runnable runnable;
        all: while (run) {
            synchronized (this) {
                while ((runnable = queue.poll()) == null) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        if (!run) {
                            break all;
                        }
                    }
                }
            }
            runnable.run();
        }
    }

    public synchronized void cancel() {
        run = false;
        interrupt();
    }

    public synchronized void add(Runnable runnable) {
        queue.add(runnable);
        notify();
    }
}

In einem Satz, was macht diese Klasse?: WorkerThread ist ein Thread, der nach der Instanziierung sofort losläuft, auf den man Runnables schmeißen kann, die er dann alle nacheinander abarbeitet.

Ungeachtet dessen, habe ich Threadkommunikation bis jetzt nicht einmal angerissen.
Der Vater von allem, das Object, bietet under anderem die Methoden wait und notify.
In einem synchronized Block, oder einer synchronized Methode, wird auf ein Objekt gelockt. Wenn mehrere Threads auf einem Objekt eine synchronized Methode aufrufen, hat das zur Folge, dass nur jeweils ein Thread (nacheinander) exklusiv auf dieses Objekt zugreifen kann.
wait und notify sind nur im synchronized Kontext zulässig.
Ein Mutex, um sich exklusiv Zugriff auf Programmressourcen zu sichern, wird so erstellt:

Object o = new Object();

Danach verpacken in synchronized(o) { // code }. Man kann einfach ein x-beliebiges Objekt nehmen.

Moment, es kann jeweils nur ein Thread (nacheinander) auf die Ressource zugreifen, da kann ich's doch auch gleich lassen mit dem Multithreading, wenn ich an dieser Stelle schon wieder blocke?

Oben sieht man, dass ich java.util.concurrent. importiert habe, eine sehr wichtige Klasse, was Multithreading angeht.
Lassen wir das Weg, statt ConcurrentLinkedQueue, nur eine ArrayList und synchronized, wait, notify fliegen auch alle raus:

public void run() {
    Runnable runnable;
    while (run) {
        while ((runnable = queue.poll()) == null || run) {
            // 100% CPU load
        }
        runnable.run();
    }
}

Ist im Winter ganz nützlich, spart man sich schön Heizkosten mit solchem Code. Das fehlende wait bewirkt nämlich, dass laufend die queue überprüft wird.
Früher oder später explodiert das Ganze mit einer bunten ConcurrentModificationException. Ist ja nicht exklusiv gelockt gewesen.
Bei wait gibt ein Thread den Lock wieder frei, also auf die aktuelle Objektinstanz oder das Objekt im synchronized Block, und geht schlafen.
Bis eine Notification kommt, irgendwo anders im selben Kontext, Mutex auf dem selben Objekt, ein notify ausgeführt wird. Infolgedessen wird der Thread wieder aufgeweckt.

Beim WorkerThread passiert das immer, wenn ein Runnable hinzugefügt wird. Dort wird er aufgeweckt, und geht erst wieder schlafen, wenn keine Runnables mehr übrig sind.

Funktionsweise des WorkerThreads:

    public static void main(String[] args){
        WorkerThread w = new WorkerThread("test");
        System.out.println("asdf");
        for(int i = 0; i < 20; i++){
            w.add(() -> {
                    System.out.println("hello from the worker thread: " + System.currentTimeMillis());
                    try {
                        Thread.sleep((int)(Math.random() * 1000 + 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                }
            });
        }
        System.out.println("Done adding Runnables");
        //w.cancel();
    }


Den WorkerThread, den hätte ich selbstredend gar nicht erst schreiben brauchen.
java.util.concurrent.Executors
Alles schon da. Im Prinzip ist der WorkerThread eine primitive Variante des newSingleThreadExecutor(), der einen ExecutorService zurück gibt, welcher mit der Methode submit Runnables annimt.
Weiterhin gibt es den newFixedThreadPool(int nThreads), ein Thread-Pool mit festgelegter Threadanzahl, sowie den newCachedThreadPool, der sich automatisch skaliert und bei Nichtnutzung keinerlei Ressourcen verbraucht.
Die scheduled Varianten davon führen ein Runnable in einem festgelegten Intervall fortwährend aus.

Das Beste Feature vom ExecutorService ist jedoch, dass er Exceptions automatisch abfängt.