14.9 Kommunikation zwischen Threads mit Pipes 

Die Kommunikation zwischen Programmteilen kann auf vielfältige Weise geschehen. Einige Möglichkeiten haben wir bei Threads kennengelernt. Bei getrennten Programmen lässt sich die Kommunikation über Dateien realisieren. Auch Datenströme können von einem Teil geschrieben und vom anderen gelesen werden. Wenn wir jedoch mit Threads arbeiten, wäre eine Kommunikation über Dateien zwar denkbar, aber zu aufwändig. Ein anderes Stromkonzept ist praktisch.
14.9.1 PipedOutputStream und PipedInputStream 

Einfacher ist der Austausch der Daten über eine so genannte Pipe. Sie wird gebildet über Paare spezieller Stromklassen PipedOutputStream/PipedInputStream beziehungsweise PipedWriter/PipedReader, übliche Unterklassen von OutputStream/InputStream und Writer/Reader. (Wir verfolgen im nächsten Beispiel die Byte-Variante.) Wenn dann Threads Daten austauschen wollen, kann ein Produzent sie über write() in den Ausgabestrom schreiben, und der andere Thread wird sie dort über read() empfangen können.
Natürlich muss ein schreibender Pipe-Strom wissen, wer der Empfänger ist. Daher müssen die Schreib-/Lese-Pipes miteinander verbunden werden. Eine Möglichkeit bietet connect().
Beispiel Ein PipedOutputStream soll mit einem PipedInputStream verbunden werden. PipedOutputStream pos = new PipedOutputStream(); PipedInputStream pis = new PipedInputStream(); |
pos.connect( pis ); // oder pis.connect( pos ); Werden jetzt Daten produziert und in den pos geschrieben, kommen sie über den pis wieder an und können dort konsumiert werden. |
Ob wir nun vom PipedOutputStream die Methode connect(PipedInputStream) nehmen oder vom PipedInputStream die Methode connect(PipedOutputStream), ist dabei egal.
Anstatt nach dem Aufbau der Ströme über den Standard-Konstruktor beide mit connect() zu verbinden, gibt es eine alternative Lösung: Entweder lässt sich nach dem Erzeugen des Piped-OutputStream über den Standard-Konstruktor das frische Strom-Objekt in den parametrisierten Konstruktor von PipedInputStream übergeben oder eben umgekehrt ein neues PipedInputStream-Objekt in den parametrisierten Konstruktor von PipedOutputStream legen.
Beispiel Verbinde den Eingabe-Stream pis mit dem Ausgabe-Stream pos: PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream( pis ); |
Interna
Der Austausch der Daten geschieht über einen internen Puffer, den PipedInputStream anlegt. Die Daten, die PipedOutputStream über write() schreiben soll, gelangen direkt zum Puffer des Eingabestroms. Werfen wir einen kurzen Blick auf die relevanten Teile der Implementierung:
class PipedOutputStream extends OutputStream { private PipedInputStream sink; public PipedOutputStream( PipedInputStream snk ) throws IOException { /* Auskommentierte Fehlerbehandlung */ sink = snk; snk.in = –1; snk.out = 0; snk.connected = true; } public void write( int b ) throws IOException { if ( sink == null ) throw new IOException( "Pipe not connected" ); sink.receive( b ); } }
Der PipedInputStream nutzt intern einen Puffer von standardmäßig 1.024 Elementen. Das bedeutet: Der Schreibende kann standardmäßig bis zu 1.024 Byte (oder Zeichen bei Piped-Reader) produzieren, bis die Kommunikation stoppen muss. Denn mit dieser Größe ist der Puffer voll und der Produzent blockiert; der Lesende muss den Puffer erst leeren, damit der Konsument weiterarbeiten darf. Umgekehrt bedeutet dies, dass der lesende Thread bei ungenügend vielen Zeichen warten muss, bis der Schreiber die nötige Anzahl hinterlegt hat. Dafür wird intern mittels Thread-Synchronisation gearbeitet. Lebt die andere Seite nicht mehr, gibt es eine IOException.
Seit Java 6 lässt sich die Größe über einen Konstruktor wie PipedInputStream(int pipeSize), PipedInputStream(PipedOutputStream src, int pipeSize), PipedReader(int pipeSize) oder PipedReader(PipedWriter src, int pipeSize) setzen.
14.9.2 PipedWriter und PipedReader 

Die Klassen PipedWriter und PipedReader sind die char-Varianten für die sonst byte-orientierten Klassen PipedOutputStream und PipedInputStream. Diese sollen uns als Beispiel dienen. Zwei Threads arbeiten miteinander und tauschen Daten aus. Der eine Thread produziert Zufallszahlen, die ein anderer Thread auf dem Bildschirm darstellt.
Listing 14.29 com/tutego/insel/io/stream/PipeDemo.java, PipeRandomWriter
package com.tutego.insel.io.stream; import java.io.*; class PipeRandomWriter extends PipedWriter implements Runnable { @Override public void run() { while ( true ) { try { write( String.format("%f%n", Math.random()) ); Thread.sleep( 200 ); } catch ( Exception e ) { e.printStackTrace(); } } } }
Der Thread ist eine Spezialisierung von PipedWriter und produziert in run() endlos Zufallszahlen, die in den Ausgabestrom vom PipedWriter geschoben werden. Der PipeRandomReader wiederum ist ein PipedReader, der über einen BufferedReader alle geschriebenen Zeilen ausliest.
Listing 14.30 com/tutego/insel/io/stream/PipeDemo.java, PipeRandomReader
class PipeRandomReader extends PipedReader implements Runnable { @Override public void run() { BufferedReader br = new BufferedReader( this ); while ( true ) try { System.out.println( br.readLine() ); } catch ( IOException e ) { e.printStackTrace(); } } }
Das Hauptprogramm erzeugt die beiden spezialisierten Pipes und verbindet sie. Danach werden die Threads gestartet.
Listing 14.31 com/tutego/insel/io/stream/PipeDemo.java, PipeDemo
public class PipeDemo { public static void main( String[] args ) throws Exception { PipeRandomWriter out = new PipeRandomWriter(); PipeRandomReader in = new PipeRandomReader(); in.connect( out ); new Thread( out ).start(); new Thread( in ).start(); } }