Installing BigMultiPipe

The bigmultipipe module is available on pip:

pip install bigmultipipe

Source code development is conducted on GitHub:

git clone git://github.com/jpmorgen/BigMultiPipe.git

Discussion of use

The bigmultipipe module provides tools that enable a flexible, modular approach to constructing data processing pipelines that optimize computer processing, memory, and disk I/O resources. The BigMultiPipe base class is subclassed by the user to connect the file reading, file writing, and data processing methods to the user’s existing processing code. The BigMultiPipe.pipeline() method runs the pipeline, maximizing the host computer’s processing resources. Keywords are available to tailor memory and processor use, the most important of which being process_size, the maximum size in bytes of an individual process. Two optional keywords, pre_process_list and post_process_list can contain lists of functions to be run on the data before and after the primary processing step. These keywords enable additional flexibility in the creation and modification of the pipeline at object instantiation and/or pipeline runtime.

Discussion of Design

The bigmultipipe module uses a three-stream approach to address the problem of parallel processing large data structures. Stream (1) is the data, which begins and ends on disk, thus side-stepping the issues of inter-process communication discussed in the Background section. Stream (2) is control. This stream is intended to control the primary processing step, but can also control pre-processing, post-processing, file name creation and file writing. The control stream starts as the keywords that are provided to a BigMultiPipe object on instantiation. Using Python’s flexible **kwarg feature, these keywords can be supplemented or overridden when the BigMultiPipe.pipeline() method is called. The functions in pre_process_list can similarly supplement or override these keywords. Finally, there is stream (3), the output metadata. Stream (3) is returned to the caller along with the output filename of each processed file for use in subsequent processing steps. Stream (3) can be used to minimize the number of times the large output data files are re-read during subsequent processing. As discussed in the Background section, the amount of information returned as metadata should be modest in size. The bigmultipipe.bmp_cleanup() function can be used to remove large items from the metadata, if necessary. As discussed in the notes of that function, the ability to store and remove large metadata items in each process metadata stream can be used to assist in building modular pipeline processing systems. Alternately, large metadata items not intended for return to the calling process can be stored in an object passed via the control stream (i.e. as a keyword of BigMultiPipe or BigMultiPipe.pipeline()).

Background

The parallel pipeline processing of large data structures is best done in a multithreaded environment, which enables the data to be easily shared between threads executing in the same process. Unfortunately, Python’s Global Interpreter Lock (GIL) prevents multiple threads from running at the same time, except in certain cases, such as I/O wait and some numpy array operations. Python’s multiprocessing module provides a partial solution to the GIL dilemma. The multiprocessing module launches multiple independent Python processes, thus providing true concurrent parallel processing in a way that does not depend on the underlying code being executed. The multiprocessing module also provides tools such as multiprocessing.Pipe() and multiprocessing.Queue that enable communication between these processes. Unfortunately, these inter-process communication solutions are not quite as flexible as shared memory between threads in one process because data must be transferred. The transfer is done using pickle: data are pickled on one end of the pipe and unpickled on the other. Depending on the complexity and size of the object, the pickle/unpickle process can be very inefficient. The bigmultipipe module provides a basic framework for avoiding all of these problems by implementing the three-stream approach described in the Discussion of Design section. Interprocess communication requiring pickle still occurs, however, only filenames and (hopefully) modest-sized metadata is exchanged in this way.

Statement of scope

This module is best suited for the simple case of a “straight through” pipeline: one input file to one output file. For more complicated pipeline topologies, the MPipe module may be useful. For parallel processing of loops that include certain numpy operations and other optimization tools, numba may be useful. Although it has yet to be tested, bigmultipipe should be mutually compatible with either or both of these other packages, although the bigmultipipe.NoDaemonPool version of multiprocessing.pool.Pool may need to be used if multiple levels of multiprocessing are being conducted.

Example

The following code shows how to develop a bigmultipipe pipeline starting from code that processes large files one at a time in a simple for loop.

First the for loop case:

>>> import os
>>> from tempfile import TemporaryDirectory, TemporaryFile
>>> import numpy as np
>>> # Write some large files
>>> with TemporaryDirectory() as tmpdirname:
>>>     in_names = []
>>>     for i in range(10):
>>>         outname = f'big_array_{i}.npy'
>>>         outname = os.path.join(tmpdirname, outname)
>>>         a = i + np.zeros((1000,2000))
>>>         np.save(outname, a)
>>>         in_names.append(outname)
>>>
>>>     # Process with traditional for loop
>>>     reject_value = 2
>>>     boost_target=3
>>>     boost_amount=5
>>>     outnames = []
>>>     meta = []
>>>     for f in in_names:
>>>         # File read step
>>>         data = np.load(f)
>>>         # Pre-processing steps
>>>         if data[0,0] == reject_value:
>>>             continue
>>>         if data[0,0] == boost_target:
>>>             flag_to_boost_later = True
>>>         else:
>>>             flag_to_boost_later = False
>>>         # Processing step
>>>         data = data * 10
>>>         # Post-processing steps
>>>         if flag_to_boost_later:
>>>             data = data + boost_amount
>>>         meta.append({'median': np.median(data),
>>>                      'average': np.average(data)})
>>>         outname = f + '_bmp'
>>>         np.save(outname, data)
>>>         outnames.append(outname)
>>>     cleaned_innames = [os.path.basename(f) for f in in_names]
>>>     cleaned_outnames = [os.path.basename(f) for f in outnames]
>>>     cleaned_pout = zip(cleaned_innames, cleaned_outnames, meta)
>>>     print(list(cleaned_pout))
>>>
[('big_array_0.npy', 'big_array_0.npy_bmp', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1.npy_bmp', {'median': 10.0, 'average': 10.0}), ('big_array_2.npy', 'big_array_3.npy_bmp', {'median': 35.0, 'average': 35.0}), ('big_array_3.npy', 'big_array_4.npy_bmp', {'median': 40.0, 'average': 40.0}), ('big_array_4.npy', 'big_array_5.npy_bmp', {'median': 50.0, 'average': 50.0}), ('big_array_5.npy', 'big_array_6.npy_bmp', {'median': 60.0, 'average': 60.0}), ('big_array_6.npy', 'big_array_7.npy_bmp', {'median': 70.0, 'average': 70.0}), ('big_array_7.npy', 'big_array_8.npy_bmp', {'median': 80.0, 'average': 80.0}), ('big_array_8.npy', 'big_array_9.npy_bmp', {'median': 90.0, 'average': 90.0})]

Now lets parallelize with bigmultipipe a few different ways:

  1. Put all code into methods in a subclass of BigMultiPipe

>>> import os
>>> from tempfile import TemporaryDirectory, TemporaryFile
>>> import numpy as np
>>>
>>> from bigmultipipe import BigMultiPipe, prune_pout
>>>
>>> class DemoMultiPipe1(BigMultiPipe):
>>>
>>>     def file_read(self, in_name, **kwargs):
>>>         data = np.load(in_name)
>>>         return data
>>>
>>>     def file_write(self, data, outname, **kwargs):
>>>         np.save(outname, data)
>>>         return outname
>>>
>>>     def data_process_meta_create(self, data,
>>>                                  reject_value=None,
>>>                                  boost_target=None,
>>>                                  boost_amount=0,
>>>                                  **kwargs):
>>>         # Pre-processing steps
>>>         if reject_value is not None:
>>>             if data[0,0] == reject_value:
>>>                 return (None, {})
>>>         if (boost_target is not None
>>>             and data[0,0] == boost_target):
>>>                 flag_to_boost_later = True
>>>         else:
>>>             flag_to_boost_later = False
>>>         # Processing step
>>>         data = data * 10
>>>         # Post-processing steps
>>>         if flag_to_boost_later:
>>>             data = data + boost_amount
>>>         meta = {'median': np.median(data),
>>>                 'average': np.average(data)}
>>>         return (data, meta)
>>> # Write large files and process with DemoMultiPipe1
>>> with TemporaryDirectory() as tmpdirname:
>>>     in_names = []
>>>     for i in range(10):
>>>         outname = f'big_array_{i}.npy'
>>>         outname = os.path.join(tmpdirname, outname)
>>>         a = i + np.zeros((1000,2000))
>>>         np.save(outname, a)
>>>         in_names.append(outname)
>>>
>>>     dmp = DemoMultiPipe1(boost_target=3, outdir=tmpdirname)
>>>     pout = dmp.pipeline(in_names, reject_value=2,
>>>                         boost_amount=5)
>>>
>>> # Prune outname ``None`` and remove directory
>>> pruned_pout, pruned_in_names = prune_pout(pout, in_names)
>>> pruned_outnames, pruned_meta = zip(*pruned_pout)
>>> pruned_outnames = [os.path.basename(f) for f in pruned_outnames]
>>> pruned_in_names = [os.path.basename(f) for f in pruned_in_names]
>>> pretty_print = zip(pruned_in_names, pruned_outnames, meta)
>>> print(list(pretty_print))
[('big_array_0.npy', 'big_array_0_bmp.npy', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1_bmp.npy', {'median': 10.0, 'average': 10.0}), ('big_array_3.npy', 'big_array_3_bmp.npy', {'median': 35.0, 'average': 35.0}), ('big_array_4.npy', 'big_array_4_bmp.npy', {'median': 40.0, 'average': 40.0}), ('big_array_5.npy', 'big_array_5_bmp.npy', {'median': 50.0, 'average': 50.0}), ('big_array_6.npy', 'big_array_6_bmp.npy', {'median': 60.0, 'average': 60.0}), ('big_array_7.npy', 'big_array_7_bmp.npy', {'median': 70.0, 'average': 70.0}), ('big_array_8.npy', 'big_array_8_bmp.npy', {'median': 80.0, 'average': 80.0}), ('big_array_9.npy', 'big_array_9_bmp.npy', {'median': 90.0, 'average': 90.0})]

Note

We override data_process_meta_create() because we are both processing data and creating metadata

Note

The outname_append parameter and outname_create() method of BigMultiPipe make it easy to tailor the look of the output filenames. The convenience function prune_pout() makes it easy to keep the input and output filename lists syncronized when files are rejected

  1. Let’s use the pre_process_list and post_process_list parameters. This allows us to assemble a pipeline at object instantiation time or pipeline run time:

>>> import os
>>> from tempfile import TemporaryDirectory, TemporaryFile
>>> import numpy as np
>>>
>>> from bigmultipipe import BigMultiPipe, prune_pout
>>>
>>> def reject(data, reject_value=None, **kwargs):
>>>     """Example pre-processing function to reject data"""
>>>     if reject_value is None:
>>>         return data
>>>     if data[0,0] == reject_value:
>>>         # --> Return data=None to reject data
>>>         return None
>>>     return data
>>>
>>> def boost_later(data, boost_target=None, boost_amount=None, **kwargs):
>>>     """Example pre-processing function that shows how to alter kwargs"""
>>>     if boost_target is None or boost_amount is None:
>>>         return data
>>>     if data[0,0] == boost_target:
>>>         add_kwargs = {'need_to_boost_by': boost_amount}
>>>         retval = {'bmp_data': data,
>>>                   'bmp_kwargs': add_kwargs}
>>>         return retval
>>>     return data
>>>
>>> def later_booster(data, need_to_boost_by=None, **kwargs):
>>>     """Example post-processing function.  Interprets keyword set by boost_later"""
>>>     if need_to_boost_by is not None:
>>>         data = data + need_to_boost_by
>>>     return data
>>>
>>> def median(data, bmp_meta=None, **kwargs):
>>>     """Example metadata generator"""
>>>     median = np.median(data)
>>>     if bmp_meta is not None:
>>>         bmp_meta['median'] = median
>>>     return data
>>>
>>> def average(data, bmp_meta=None, **kwargs):
>>>     """Example metadata generator"""
>>>     av = np.average(data)
>>>     local_meta = {'average': av}
>>>     if bmp_meta is not None:
>>>         bmp_meta.update(local_meta)
>>>     return data
>>>
>>> class DemoMultiPipe2(BigMultiPipe):
>>>
>>>     def file_read(self, in_name, **kwargs):
>>>         data = np.load(in_name)
>>>         return data
>>>
>>>     def file_write(self, data, outname, **kwargs):
>>>         np.save(outname, data)
>>>         return outname
>>>
>>>     def data_process(self, data, **kwargs):
>>>         return data * 10
>>>
>>> # Write large files and process with DemoMultiPipe2
>>> with TemporaryDirectory() as tmpdirname:
>>>     in_names = []
>>>     for i in range(10):
>>>         outname = f'big_array_{i}.npy'
>>>         outname = os.path.join(tmpdirname, outname)
>>>         a = i + np.zeros((1000,2000))
>>>         np.save(outname, a)
>>>         in_names.append(outname)
>>>
>>>     # Create a pipeline using the pre- and post-processing
>>>     # components defined above.  This enables pipeline is to be
>>>     # assembled at instantiation and controlled at either
>>>     # instantiation or runtime
>>>     dmp = DemoMultiPipe2(pre_process_list=[reject, boost_later],
>>>                          post_process_list=[later_booster, median, average],
>>>                          boost_target=3, outdir=tmpdirname)
>>>     pout = dmp.pipeline(in_names, reject_value=2,
>>>                         boost_amount=5)
>>>
>>> # Prune outname ``None`` and remove directory
>>> pruned_pout, pruned_in_names = prune_pout(pout, in_names)
>>> pruned_outnames, pruned_meta = zip(*pruned_pout)
>>> pruned_outnames = [os.path.basename(f) for f in pruned_outnames]
>>> pruned_in_names = [os.path.basename(f) for f in pruned_in_names]
>>> pretty_print = zip(pruned_in_names, pruned_outnames, pruned_meta)
>>> print(list(pretty_print))
[('big_array_0.npy', 'big_array_0_bmp.npy', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1_bmp.npy', {'median': 10.0, 'average': 10.0}), ('big_array_3.npy', 'big_array_3_bmp.npy', {'median': 35.0, 'average': 35.0}), ('big_array_4.npy', 'big_array_4_bmp.npy', {'median': 40.0, 'average': 40.0}), ('big_array_5.npy', 'big_array_5_bmp.npy', {'median': 50.0, 'average': 50.0}), ('big_array_6.npy', 'big_array_6_bmp.npy', {'median': 60.0, 'average': 60.0}), ('big_array_7.npy', 'big_array_7_bmp.npy', {'median': 70.0, 'average': 70.0}), ('big_array_8.npy', 'big_array_8_bmp.npy', {'median': 80.0, 'average': 80.0}), ('big_array_9.npy', 'big_array_9_bmp.npy', {'median': 90.0, 'average': 90.0})]

Note

The median and average functions show two different ways to create local metadata and merge it into the BigMultiPipe metadata