.. _install: Installing BigMultiPipe ----------------------- The `bigmultipipe` module is available on `pip `_:: pip install bigmultipipe Source code development is conducted on `GitHub `_:: git clone git://github.com/jpmorgen/BigMultiPipe.git .. _use: Discussion of use ----------------- The `bigmultipipe` module provides tools that enable a flexible, modular approach to constructing data processing pipelines that optimize computer processing, memory, and disk I/O resources. The :class:`~bigmultipipe.BigMultiPipe` base class is subclassed by the user to connect the file reading, file writing, and data processing methods to the user's existing processing code. The :meth:`BigMultiPipe.pipeline() ` method runs the pipeline, maximizing the host computer's processing resources. Keywords are available to tailor memory and processor use, the most important of which being ``process_size``, the maximum size in bytes of an individual process. Two optional keywords, ``pre_process_list`` and ``post_process_list`` can contain lists of functions to be run on the data before and after the primary processing step. These keywords enable additional flexibility in the creation and modification of the pipeline at object instantiation and/or pipeline runtime. .. _design: Discussion of Design -------------------- The `bigmultipipe` module uses a three-stream approach to address the problem of parallel processing large data structures. Stream (1) is the data, which begins and ends on disk, thus side-stepping the issues of inter-process communication discussed in the `Background`_ section. Stream (2) is control. This stream is intended to control the primary processing step, but can also control pre-processing, post-processing, file name creation and file writing. The control stream starts as the keywords that are provided to a :class:`~bigmultipipe.BigMultiPipe` object on instantiation. Using Python's flexible ``**kwarg`` feature, these keywords can be supplemented or overridden when the :meth:`BigMultiPipe.pipeline() ` method is called. The functions in ``pre_process_list`` can similarly supplement or override these keywords. Finally, there is stream (3), the output metadata. Stream (3) is returned to the caller along with the output filename of each processed file for use in subsequent processing steps. Stream (3) can be used to minimize the number of times the large output data files are re-read during subsequent processing. As discussed in the `Background`_ section, the amount of information returned as metadata should be modest in size. The :func:`bigmultipipe.bmp_cleanup()` function can be used to remove large items from the metadata, if necessary. As discussed in the notes of that function, the ability to store and remove large metadata items in each process metadata stream can be used to assist in building modular pipeline processing systems. Alternately, large metadata items not intended for return to the calling process can be stored in an object passed via the control stream (i.e. as a keyword of `~bigmultipipe.BigMultiPipe` or :meth:`BigMultiPipe.pipeline() `). .. _background: Background ---------- The parallel pipeline processing of large data structures is best done in a multithreaded environment, which enables the data to be easily shared between threads executing in the same process. Unfortunately, Python's `Global Interpreter Lock (GIL)`_ prevents multiple threads from running at the same time, except in certain cases, such as I/O wait and some `numpy` array operations. Python's `multiprocessing` module provides a partial solution to the GIL dilemma. The `multiprocessing` module launches multiple independent Python processes, thus providing true concurrent parallel processing in a way that does not depend on the underlying code being executed. The multiprocessing module also provides tools such as :func:`multiprocessing.Pipe` and :class:`multiprocessing.Queue` that enable communication between these processes. Unfortunately, these inter-process communication solutions are not quite as flexible as shared memory between threads in one process because data must be transferred. The transfer is done using `pickle`: data are pickled on one end of the pipe and unpickled on the other. Depending on the complexity and size of the object, the pickle/unpickle process can be very inefficient. The `bigmultipipe` module provides a basic framework for avoiding all of these problems by implementing the three-stream approach described in the `Discussion of Design`_ section. Interprocess communication requiring `pickle` still occurs, however, only filenames and (hopefully) modest-sized metadata is exchanged in this way. .. _Global Interpreter Lock (GIL): https://wiki.python.org/moin/GlobalInterpreterLock Statement of scope ------------------ This module is best suited for the simple case of a "straight through" pipeline: one input file to one output file. For more complicated pipeline topologies, the `MPipe`_ module may be useful. For parallel processing of loops that include certain `numpy` operations and other optimization tools, `numba`_ may be useful. Although it has yet to be tested, `bigmultipipe` should be mutually compatible with either or both of these other packages, although the :class:`bigmultipipe.NoDaemonPool` version of :class:`multiprocessing.pool.Pool` may need to be used if multiple levels of multiprocessing are being conducted. .. _MPipe: https://vmlaker.github.io/mpipe/ .. _numba: https://numba.pydata.org/ .. _example: Example ------- The following code shows how to develop a `bigmultipipe` pipeline starting from code that processes large files one at a time in a simple for loop. First the `for` loop case: >>> import os >>> from tempfile import TemporaryDirectory, TemporaryFile >>> import numpy as np >>> # Write some large files >>> with TemporaryDirectory() as tmpdirname: >>> in_names = [] >>> for i in range(10): >>> outname = f'big_array_{i}.npy' >>> outname = os.path.join(tmpdirname, outname) >>> a = i + np.zeros((1000,2000)) >>> np.save(outname, a) >>> in_names.append(outname) >>> >>> # Process with traditional for loop >>> reject_value = 2 >>> boost_target=3 >>> boost_amount=5 >>> outnames = [] >>> meta = [] >>> for f in in_names: >>> # File read step >>> data = np.load(f) >>> # Pre-processing steps >>> if data[0,0] == reject_value: >>> continue >>> if data[0,0] == boost_target: >>> flag_to_boost_later = True >>> else: >>> flag_to_boost_later = False >>> # Processing step >>> data = data * 10 >>> # Post-processing steps >>> if flag_to_boost_later: >>> data = data + boost_amount >>> meta.append({'median': np.median(data), >>> 'average': np.average(data)}) >>> outname = f + '_bmp' >>> np.save(outname, data) >>> outnames.append(outname) >>> cleaned_innames = [os.path.basename(f) for f in in_names] >>> cleaned_outnames = [os.path.basename(f) for f in outnames] >>> cleaned_pout = zip(cleaned_innames, cleaned_outnames, meta) >>> print(list(cleaned_pout)) >>> [('big_array_0.npy', 'big_array_0.npy_bmp', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1.npy_bmp', {'median': 10.0, 'average': 10.0}), ('big_array_2.npy', 'big_array_3.npy_bmp', {'median': 35.0, 'average': 35.0}), ('big_array_3.npy', 'big_array_4.npy_bmp', {'median': 40.0, 'average': 40.0}), ('big_array_4.npy', 'big_array_5.npy_bmp', {'median': 50.0, 'average': 50.0}), ('big_array_5.npy', 'big_array_6.npy_bmp', {'median': 60.0, 'average': 60.0}), ('big_array_6.npy', 'big_array_7.npy_bmp', {'median': 70.0, 'average': 70.0}), ('big_array_7.npy', 'big_array_8.npy_bmp', {'median': 80.0, 'average': 80.0}), ('big_array_8.npy', 'big_array_9.npy_bmp', {'median': 90.0, 'average': 90.0})] Now lets parallelize with `bigmultipipe` a few different ways: (1) Put all code into methods in a subclass of :class:`~bigmultipipe.BigMultiPipe` >>> import os >>> from tempfile import TemporaryDirectory, TemporaryFile >>> import numpy as np >>> >>> from bigmultipipe import BigMultiPipe, prune_pout >>> >>> class DemoMultiPipe1(BigMultiPipe): >>> >>> def file_read(self, in_name, **kwargs): >>> data = np.load(in_name) >>> return data >>> >>> def file_write(self, data, outname, **kwargs): >>> np.save(outname, data) >>> return outname >>> >>> def data_process_meta_create(self, data, >>> reject_value=None, >>> boost_target=None, >>> boost_amount=0, >>> **kwargs): >>> # Pre-processing steps >>> if reject_value is not None: >>> if data[0,0] == reject_value: >>> return (None, {}) >>> if (boost_target is not None >>> and data[0,0] == boost_target): >>> flag_to_boost_later = True >>> else: >>> flag_to_boost_later = False >>> # Processing step >>> data = data * 10 >>> # Post-processing steps >>> if flag_to_boost_later: >>> data = data + boost_amount >>> meta = {'median': np.median(data), >>> 'average': np.average(data)} >>> return (data, meta) >>> # Write large files and process with DemoMultiPipe1 >>> with TemporaryDirectory() as tmpdirname: >>> in_names = [] >>> for i in range(10): >>> outname = f'big_array_{i}.npy' >>> outname = os.path.join(tmpdirname, outname) >>> a = i + np.zeros((1000,2000)) >>> np.save(outname, a) >>> in_names.append(outname) >>> >>> dmp = DemoMultiPipe1(boost_target=3, outdir=tmpdirname) >>> pout = dmp.pipeline(in_names, reject_value=2, >>> boost_amount=5) >>> >>> # Prune outname ``None`` and remove directory >>> pruned_pout, pruned_in_names = prune_pout(pout, in_names) >>> pruned_outnames, pruned_meta = zip(*pruned_pout) >>> pruned_outnames = [os.path.basename(f) for f in pruned_outnames] >>> pruned_in_names = [os.path.basename(f) for f in pruned_in_names] >>> pretty_print = zip(pruned_in_names, pruned_outnames, meta) >>> print(list(pretty_print)) [('big_array_0.npy', 'big_array_0_bmp.npy', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1_bmp.npy', {'median': 10.0, 'average': 10.0}), ('big_array_3.npy', 'big_array_3_bmp.npy', {'median': 35.0, 'average': 35.0}), ('big_array_4.npy', 'big_array_4_bmp.npy', {'median': 40.0, 'average': 40.0}), ('big_array_5.npy', 'big_array_5_bmp.npy', {'median': 50.0, 'average': 50.0}), ('big_array_6.npy', 'big_array_6_bmp.npy', {'median': 60.0, 'average': 60.0}), ('big_array_7.npy', 'big_array_7_bmp.npy', {'median': 70.0, 'average': 70.0}), ('big_array_8.npy', 'big_array_8_bmp.npy', {'median': 80.0, 'average': 80.0}), ('big_array_9.npy', 'big_array_9_bmp.npy', {'median': 90.0, 'average': 90.0})] .. note:: We override :meth:`~bigmultipipe.BigMultiPipe.data_process_meta_create` because we are both processing data *and* creating metadata .. note:: The ``outname_append`` parameter and :meth:`~bigmultipipe.BigMultiPipe.outname_create` method of :class:`~bigmultipipe.BigMultiPipe` make it easy to tailor the look of the output filenames. The convenience function :func:`~bigmultipipe.prune_pout` makes it easy to keep the input and output filename lists syncronized when files are rejected (2) Let's use the ``pre_process_list`` and ``post_process_list`` parameters. This allows us to assemble a pipeline at object instantiation time or pipeline run time: >>> import os >>> from tempfile import TemporaryDirectory, TemporaryFile >>> import numpy as np >>> >>> from bigmultipipe import BigMultiPipe, prune_pout >>> >>> def reject(data, reject_value=None, **kwargs): >>> """Example pre-processing function to reject data""" >>> if reject_value is None: >>> return data >>> if data[0,0] == reject_value: >>> # --> Return data=None to reject data >>> return None >>> return data >>> >>> def boost_later(data, boost_target=None, boost_amount=None, **kwargs): >>> """Example pre-processing function that shows how to alter kwargs""" >>> if boost_target is None or boost_amount is None: >>> return data >>> if data[0,0] == boost_target: >>> add_kwargs = {'need_to_boost_by': boost_amount} >>> retval = {'bmp_data': data, >>> 'bmp_kwargs': add_kwargs} >>> return retval >>> return data >>> >>> def later_booster(data, need_to_boost_by=None, **kwargs): >>> """Example post-processing function. Interprets keyword set by boost_later""" >>> if need_to_boost_by is not None: >>> data = data + need_to_boost_by >>> return data >>> >>> def median(data, bmp_meta=None, **kwargs): >>> """Example metadata generator""" >>> median = np.median(data) >>> if bmp_meta is not None: >>> bmp_meta['median'] = median >>> return data >>> >>> def average(data, bmp_meta=None, **kwargs): >>> """Example metadata generator""" >>> av = np.average(data) >>> local_meta = {'average': av} >>> if bmp_meta is not None: >>> bmp_meta.update(local_meta) >>> return data >>> >>> class DemoMultiPipe2(BigMultiPipe): >>> >>> def file_read(self, in_name, **kwargs): >>> data = np.load(in_name) >>> return data >>> >>> def file_write(self, data, outname, **kwargs): >>> np.save(outname, data) >>> return outname >>> >>> def data_process(self, data, **kwargs): >>> return data * 10 >>> >>> # Write large files and process with DemoMultiPipe2 >>> with TemporaryDirectory() as tmpdirname: >>> in_names = [] >>> for i in range(10): >>> outname = f'big_array_{i}.npy' >>> outname = os.path.join(tmpdirname, outname) >>> a = i + np.zeros((1000,2000)) >>> np.save(outname, a) >>> in_names.append(outname) >>> >>> # Create a pipeline using the pre- and post-processing >>> # components defined above. This enables pipeline is to be >>> # assembled at instantiation and controlled at either >>> # instantiation or runtime >>> dmp = DemoMultiPipe2(pre_process_list=[reject, boost_later], >>> post_process_list=[later_booster, median, average], >>> boost_target=3, outdir=tmpdirname) >>> pout = dmp.pipeline(in_names, reject_value=2, >>> boost_amount=5) >>> >>> # Prune outname ``None`` and remove directory >>> pruned_pout, pruned_in_names = prune_pout(pout, in_names) >>> pruned_outnames, pruned_meta = zip(*pruned_pout) >>> pruned_outnames = [os.path.basename(f) for f in pruned_outnames] >>> pruned_in_names = [os.path.basename(f) for f in pruned_in_names] >>> pretty_print = zip(pruned_in_names, pruned_outnames, pruned_meta) >>> print(list(pretty_print)) [('big_array_0.npy', 'big_array_0_bmp.npy', {'median': 0.0, 'average': 0.0}), ('big_array_1.npy', 'big_array_1_bmp.npy', {'median': 10.0, 'average': 10.0}), ('big_array_3.npy', 'big_array_3_bmp.npy', {'median': 35.0, 'average': 35.0}), ('big_array_4.npy', 'big_array_4_bmp.npy', {'median': 40.0, 'average': 40.0}), ('big_array_5.npy', 'big_array_5_bmp.npy', {'median': 50.0, 'average': 50.0}), ('big_array_6.npy', 'big_array_6_bmp.npy', {'median': 60.0, 'average': 60.0}), ('big_array_7.npy', 'big_array_7_bmp.npy', {'median': 70.0, 'average': 70.0}), ('big_array_8.npy', 'big_array_8_bmp.npy', {'median': 80.0, 'average': 80.0}), ('big_array_9.npy', 'big_array_9_bmp.npy', {'median': 90.0, 'average': 90.0})] .. note:: The ``median`` and ``average`` functions show two different ways to create local metadata and merge it into the `BigMultiPipe` metadata