One common problem in distributed systems is the migration and movement of all kinds of data. Heterogeneity is an important problem here because sources and destinations can be very different. Moreover, the movement of data involves certain considerations which are essentially independent of the nature of sources and destinations. An abstraction is introduced to unify the movement of a data from a source to a destination: a pipe. Source and destination are determined by a module and its initialization values.
The data source module exports, among others, the functions:
- init(DS_initparam) -> {ok, DS_info, DS_state} | {error, Reason}
Initializes the source with certain initialization parameters. It returns a tuple with the atom ok, information about the data source and the explicit statement to read from the source (DS_State). If there is any problem, it returns an error.
- read(DS_State) -> {ok, Data, DS_State} | {done, DS_doneparam} |
{error, Reason}
Reads the next data block from the data source, returning ok, the data (Data, a binary value), and the state for the next iteration. If all of the data from the data source has been consumed, it returns done and the needed state to close the data source; if anything goes wrong, it returns error
- done(DS_doneparam) -> ok | {error, Reason}
Cleans the data source after all data has been consumed.
The main functionality of the destination module is:
- init(DS_info, DD_initparam) -> {ok, DD_state} | {error, Reason}
Initializes the destination with some initialization parameters and information about the initialization of the data source. It returns a tuple with the atom ok, and the explicit state for writing on the destination (DD_state). If anything goes wrong, it returns error.
- write(Data, DD_State) -> {ok, DD_state} | {error, Reason}
Writes the next data block on the data destination, returning ok for the next iteration. If anything goes wrong, it returns an error.
- done(DD_state) -> ok | {error, Reason}
Closes the data destination.
Now, we present a simplified implementation of the pipe abstraction which also performs additional functions such as error handling, control of the transmission rate, records, etc.. The pipe is created using the function start/2, which generates a new process for the movement of data. The transference, modeled with transfer/2, initializes the data source and data destination and then moves data packets between them (pipe_while/4). Finally, the data source and data destination are closed.
The tail-recursive function pipe_while/4 reads packets from the data source and writes them to the data destination until the source data is empty.
For example, a data source that reads a file is defined as:
The module uses file utility to open, read and close a file. Similarly, a data destination that stores all data in a file:
Copying a file can thus be implemented with a pipe that uses read_file and write_file:
Fig. 4. Pipe linking in a transmission
Figure 4 shows the pipe chain of a streaming session.




