4.11. Extending and Creating Pipelines

It is possible to leverage the pipeline software framework and command-line tools available within Helisphere to extend an existing pipeline or create a new one. It is even possible to use Helisphere’s pipeline capabilities to assemble a pipeline composed of custom tools external to Helisphere.

To extend an existing pipeline, you should be familiar running and analyzing the pre-built pipelines, and you must have a passing knowledge of Python and general object oriented concepts. To build an entirely new pipeline, or to incorporate external tools as part of a pipeline, you should also be familiar with the SCons build framework, at the level of understanding its Construction Environment and Custom Builder features (http://scons.org).

4.11.1. Pipeline Inheritance Hierarchy

All pipelines currently perform the same functions upon initialization, but differ in the steps to be performed on the set of reads to be analyzed. The identical initialization steps and iteration over the set of input files is currently handled for each pipeline by the pipeline base class defined in the pypeline.pipelines.base module, found in <helisphere_install_dir>/pypeline/pipelines/base.py. This base class defines an abstract method analyzeGroup which needs to be overridden in any derived class. analyzeGroup is the function which defines the analysis steps to be performed on a given set of reads within an sms file. Additionally, each pipeline you create must define the member variable globals, which is a list of variables required to be defined in the Global section of the pipeline’s INI-style configuration file.

4.11.2. SCons Framework Overview

The current pipeline framework relies on the Python-based SCons build framework to determine build order, file organization, and to provide parallelism and re-entrant capabilities. Each pipeline is simply a SCons construction file which specifies a set of analysis steps to be executed, and for each analysis step, specifies what the source (input) is, and what the target (output) is. Based on this, SCons determines the order in which steps should be executed. SCons is Python-based, and the construction scripts can contain any valid Python code- including loops and conditional statements.

Just as each pipeline is a SCons construction file, each analysis step is simply a SCons custom builder which is responsible for executing the analysis given a source (input or set of inputs). This analysis will produce target file or files, which the builder may rename or manage as appropriate, before returning them to the pipeline. The next analysis step in the pipeline can then specify the returned target as its source, and so forth. Because SCons determines execution order, when extending a pipeline, you can define the analysis steps without having to worry about specifying them in any particular order, as long as you have specified your source and target files to accurately reflect the dependencies between the analysis steps. However, because execution order is determined by SCons, you cannot access target files that are created during pipeline execution except through a SCons builder. If you find that you need to access target files within the pipeline before they are passed as input to another step, for example to parse them or modify them, then you will need to write a custom builder to do this and invoke that builder from within the pipeline.

For more information on SCons and developing your own custom builders, please see the section below Section 4.11.9, “Wrapping New Tools”.

4.11.3. Creating a New Pipeline

To create a new pipeline, you would simply extend from the Pipeline class in the pypeline.pipelines.base module, and define the set of global variables allowed in your new pipeline configuration file’s Global section. You will then need to implement the analyzeGroup function, by specifying the set of analysis functions to perform. Finally, you can query the sqlite database output from a pipeline analysis in order to summarize your results.

The development area for your new pipeline can be located anywhere that can access the Helisphere install directory. It is not recommended to do any development of new pipelines within the Helisphere install directory.

  • To create a new pipeline, first make set your $PYTHONPATH environment variable so that the pypeline module is importable from wherever you are developing your new pipeline:

    $ export PYTHONPATH=<helisphere_install_dir>:$PYTHONPATH

    For example, if you used the default Helisphere install directory the command would be

    $ export PYTHONPATH=/usr/local/helisphere:$PYTHONPATH
  • Next, open your new pipeline file as <new_pipeline_name>.py and import that following modules:

    from pypeline.util.param import Parameter
    from pypeline.pipelines.base import Pipeline

    These modules provide access to the Parameter class so that you can define the necessary Global variables for your new pipeline, and give you access to the Pipeline base class.

  • The next step is to define your new pipeline class. We recommend that you inherit from the base pipeline class, to leverage as much existing functionality as possible. First, inherit from the base class and define those variables which your new pipeline expects to encounter in the Global section of the INI-style configuration file:

    class MyNewPipeline(Pipeline):
    
        globals = [Parameter('channels', 'Global.channels', 'Channels to be processed.  Default is all channels'),
                   Parameter('input', 'Global.input', 'Input files to be processed.  May contain wildcards as in Unix (*.sms)'),
                   Parameter('outdir', 'Global.outdir', 'Output directory'),
                   Parameter('minCountScore', 'Global.minCountScore', 'Threshold value of normalized alignment score of strands to include in expressed transcipt analysis')]
  • Then define the analysis steps you wish to execute on each set of reads on the analyzeGroup function. The list of available analysis functions are documented in the next section Section 4.11.8, “Analysis Tool List”. Each analysis function should be invoked through the env function argument, and each returns a list of files as a SCons Node object which were created as its targets. All analysis functions take a source parameter and typically also a group parameter which specifies the prefix that output files will be named with, so as to keep the results from each set of reads easily distinguishable. Analysis functions which take a --output-file command-line parameter also typically require a target parameter to be passed to them, which indicates the base name that will be passed as the value of the --output-file parameter. Output files generated from the analysis steps will be stored in directories bearing the name of the analysis function (e.g. ExtractSMS, FilterSMS). Each analysis tool returns a list of the files created as its target. The list can then be used a source for another analysis step:

    def analyzeGroup(self, sms_file, group, fcch, env):
    
        sampledReads = env.ExtractSMS(source=sms_file, target='sample.sms', group=group)
    
        filteredSMS = env.FilterSMS(source=sampledReads, target='filtered.sms', group=group)
  • If you want to perform the same analysis step twice, either on a different set of sources or using different parameter settings from the INI-style configuration file, then you should also specify a NAME parameter, which will be used as the name of the output directory in which to store results. For example, the following will result in two output directories, one named LengthTool and one named LengthToolForGrowth:

    env.LengthTool(source=filteredForGrowth,
                   channels=thisChannel,
                   group=group,
                   NAME="LengthToolForGrowth")
    
    env.LengthTool(source=filtered,
                   channels=thisChannel,
                   group=group)

    Since SCons construction environments can contain any valid Python code, it is also possible to include loops or conditional statements within your pipeline:

      if Global =='True':
    
                # split reads by barcode
                palmerOutput = env.Palmer(source=filteredReads,
                                          group=group )
    
                discardedReads = palmerOutput[1]
                separatedReads = palmerOutput[2:]
                numBarcodes = len(separatedReads)
    
                for bc in range(numBarcodes):
    
                    # get new group name
                    group=os.path.basename(str(separatedReads[bc]))
                    group=group[:-4]
    
                    # refilter for length
                    refilteredReads, filterStats = env.FilterSMS(target='filtered.sms',
                                                                 source=separatedReads[bc],
                                                                 NAME='FilterSMS-PostBC',
                                                                 group=group)
    
                    # continue analysis per barcode
                    self.analyzeSubGroup(refilteredReads, group, fcch, env)
    
            else:
                self.analyzeSubGroup(filteredReads, group, fcch, env)
  • Refer to Section 4.11.4, “Reports” for any custom reporting requirements in the new pipeline
  • Refer to Section 4.11.7, “Running Custom Pipelines” for instructions on how to run the new pipeline

4.11.4. Reports

It is crucially important to understand that since the pipeline system is a SCons construction file, SCons reads the pipeline files and determines build order before executing any build steps. For example, any print statements or functions which do not execute builders will get executed during this read phase before any analysis steps are actually executed. Therefore, you cannot access files that will be output from the analysis steps unless you do so through a SCons custom builder. For this reason, we recommend that you perform all summarizing of output files through a standalone script after the pipeline has finished execution unless you are comfortable writing custom SCons builders.

The simplest way to create a summary report as a standalone script is to make use of the sqlite database file which is produced automatically for any pipeline you create and run your summary reports as a stand-alone script after the pipelines have finished executing. The database file contains the results of parsing all the output files created during the execution of each analysis step. The analysis tools write to the following database tables, indexing their results by the name of the pipeline step and the group being analyzed:

  • Align2Txt

    • Tables: StrandScoreHistogram, PerfectStrandCount, ReferenceCounts
  • ErrorTool

    • Tables: ErrorByNucleotide, BaseCounts
  • FilterStats

    • Table: FilterStats
  • SMSls

    • Table: SMSContents
  • LengthTool

    • Table: StrandLength, TermLoss, Growth
  • LengthToolLite

    • Table: StrandLength
  • TranscriptCount

    • Table: ParseCountStats

Data from the srfHeaders and the clipping file are stored in these tables:

  • SrfHeader

    • Table: CameraArea
  • ClippingFile

    • Table: Usable Area

The database file can be found in the pickle sub-directory of the pipeline output directory. Create a Python script, and import and query the database:

#!/usr/bin/python
import sqlite3

#create connection
conn = sqlite3.connect('output/pickle/data.sqlite')
c = conn.cursor()

#list all tables in database
c.execute("select name from sqlite_master where type='table'")
print c.fetchall()

#describe a table
c.execute('PRAGMA table_info(tableName)')
print c.fetchall()


#select from a table
c.execute('select * from tableName')
print c.fetchall()

For more information on the queries you can pass to the execute() function, see the sqlite documentation sqlite documentation and the Python sqlite3 module documentation.

You can also choose to write a script which opens, parses and summarizes the output files directly, if you do not wish to make use of the sqlite database file.

However, should you wish to run reports from within the SCons framework you can do so. You will need to create custom builders so that SCons will be able to determine that the analysis steps which generate the outputs you want to summarize have to be completed before the summaries are generated. For more information on creating custom builders, see the Section 4.11.9, “Wrapping New Tools” section. To generate reports from within the pipeline, you will need to override the report function and call the custom SCons report builders from within it. You can also use one of the predefined reports found in the module <helisphere_install_directory>/pypeline/builders/reports.py. For example, to use some pre-defined reports, you could override report():

def report(self, en):
    env.ErrorReport()

4.11.5. Pipeline Configuration File

All pipelines requires an INI-style configuration file to determine the values of various parameters to pass to each analysis function. Once you have constructed your pipeline, execute the command below to generate a shell INI-configuration file in the directory in which it is invoked. Be sure to source <helisphere_install_dir>/helicos.bashrc first.

pypeline -p <new_pipeline_name>  --pipeline-path <location of new_pipeline>  --print-config

The shell configuration file will be named <new_pipeline_name.config.template. For each analysis tool you have specified in the analyzeGroup function, the shell configuration file will print a list of all possible variables you can specify for that tool. By default, all variables except the [Global] parameters are commented out. Uncomment and fill in the parameters you wish to set, and either remove or leave commented out the remaining variables. You should consult the individual documentation for each tool to make these determinations.

The pipeline support variable interpolation in the configuration file, so it is possible to specify a parameter in the [Global] or [Site] sections of your configuration file and use it to provide the value for other parameters throughout your configuration file. The following example configuration file shows Global parameters used in other configuration sections, as well as commented out and specified configuration values.

[Site]
useSGE = True
binaryPath=/install_dir/helisphere-1.0.389.151/bin
configDir=../../pypeline/config
referenceDir=/install_dir/helisphere-1.0.389.151/reference_data


[Global]
channels = 1:1-25,2:1-25
referenceName = 30-dT50-VT-Cy3-SSCy5
input = sms/*.sms
outdir = basic.output
minLength = 25
minScore = 4
bestOnly = True
globalAmbig = all
sample = False


[ExtractSMS-Sample]
sampleSize= 20000
#readPass=
#aligned=
#unaligned=
#common=
#diff=
#channels=


[FilterSMS]
minLength = $(Global.minLength)
maxQuality = 100
#trimLeadingTFreq1=
#removeLock=
trimLeadingTFreq = 2/0.75
#trimTailingAFreq=
#trimTailingCFreq=
trimRate= 2
#filterRate=
#trimBeg=
#trimEnd=
noControl= False
dinucFile = $(Site.configDir)/dinuc.txt
#sampleSize=
#align=
#minScore=


[IndexDPGenomic]
referenceDB = $(Site.referenceDir)/$(Global.referenceName).seed18
referenceFile = $(Site.referenceDir)/$(Global.referenceName).fasta
strands = both
mode = GL
bestOnly = $(Global.bestOnly)
minScore = $(Global.minScore)

4.11.6. Extending an Existing Pipeline

To extend a pipeline, follow the same steps as above, with these modifications:

  • When you define your class, import the pipeline you wish to extend, instead of pypeline.pipelines.base. For example, to extend the Basic pipeline:

    from pypeline.util.param import Parameter
    from pypeline.pipelines.dge import Basic
  • When you define your new class, inherit from the pipeline you wish to extend and redefine all your global variables. For example, to extend the basic pipeline:

    class BasicSort(Basic):
    
        globals = [Parameter('sample', 'Global.sample', "If set to 'True', process only a sample of reads "),
                   Parameter('input', 'Global.input', "Input files.  May contain wildcards as in Unix [*.sms]"),
                   Parameter('outdir', 'Global.outdir', 'Output directory'),
                   Parameter('referenceName', 'Global.referenceName', 'Name of the reference to be used.  The *.fast file and processed database files for this reference should be located in Site.referenceDirectory'),
                   Parameter('minLength', 'Global.minLength', 'Minimum read length of reads to be processed'),
                   Parameter('minScore', 'Global.minScore', 'Minimum alignment score to be considered'),
                   Parameter('bestOnly', 'Global.bestOnly', 'Whether only highest scoring alignments are to be kept'),
                   Parameter('globalAmbig', 'Global.globalAmbig', 'In case of ambiguous alignments, which alignments should be kept (all/rand/none)')]
  • When you define your analyzeGroup function, you will want to call the base class’s analyzeGroup function as well.

    def analyzeGroup(self, sms_file, group, fcch, env):
    
        # Call the analzyeGroup method of the inherited pipeline
        Basic.analyzeGroup(self, sms_file, group, fcch, env)
    
        # New functionality starts here
  • Prior to adding new functionality, it is a good idea to run the pipeline as is. This will provide a baseline as new features are added to the pipeline. Refer to section Section 4.11.7, “Running Custom Pipelines” for steps on how to execute the pipeline.
  • In analyzeGroup, to access targets created by the pipeline you are inheriting from, you will want to use the queryTargets function. The queryTargets function returns a list of filenames representing the target outputs generated for a particular analysis step and group of reads. You need to provide the queryTargets function with the name of the analysis step whose outputs you want, the group name whose outputs you want, and optionally, a regular expression if you want to filter by file extension. For instance:

    # Get the outputs of the 'ExtractSMS-Aligned' step from the base pipeline
    alignedReads = self.queryTargets(group, 'ExtractSMS-Aligned')
    
    # Get the outputs of the 'FilterAlign' step from the base pipeline whose outputs include 'bin' in the file name
    filteredAlignments = self.queryTargets(group, 'FilterAlign', 'bin')
  • The targets that you access via the queryTargets command from the inherited pipeline can then be used in additional processing. For example, this pipeline derived from the Basic pipeline will generate sorted alignments from the filtered alignments of the Basic pipeline.

    def analyzeGroup(self, sms_file, group, fcch, env):
    
        # Call the analyzeGroup method of the inherited pipeline
        Basic.analyzeGroup(self, sms_file, group, fcch, env)
    
        # Get the binary outputs of the 'FilterAlign' step from the base pipeline
        filteredAlignments = self.queryTargets(group, 'FilterAlign','.bin')
    
        # Sort the filtered alignments
        sortedAlignments = env.SortAlign(target='sorted.align', source=filteredAlignments, group=group)

    Because the pipelines are run within a SCons framework, it does not matter what order you specify your analysis steps, as long as you specify existing source files and avoid loops. Therefore, if you want to extend a pipeline by adding a parallel analysis step to the middle or adding extra steps to the end, you can do so easily by invoking analysis tools via the env variable, as you did when creating a new pipeline.

  • If you do not override the report function, the reports defined for the pipeline you are extending will be created. If you choose to override the report function, you will not generate these reports unless you call the report function of the pipeline you are inheriting from explicitly:

    def report(self, env):
        Basic.report(self, env)
        env.MyCustomReport(target='mycustomreport.summary.txt')
  • An example of a simple pipeline that derives from the Basic pipeline is shown below. Not that this pipeline does not override the report function, so the reporting will be identical to the inherited pipelines reporting

    from pypeline.util.param import Parameter
    from pypeline.pipelines.basic import Basic
    
    class BasicSort(Basic):
        globals = [Parameter('sample', 'Global.sample', "If set to 'True', process only a sample of reads "),
                   Parameter('input', 'Global.input', "Input files.  May contain wildcards as in Unix [*.sms]"),
                   Parameter('outdir', 'Global.outdir', 'Output directory'),
                   Parameter('referenceName', 'Global.referenceName', 'Name of the reference to be used.  The *.fast file and processed database files for this reference should be located in Site.referenceDirectory'),
                   Parameter('minLength', 'Global.minLength', 'Minimum read length of reads to be processed'),
                   Parameter('minScore', 'Global.minScore', 'Minimum alignment score to be considered'),
                   Parameter('bestOnly', 'Global.bestOnly', 'Whether only highest scoring alignments are to be kept'),
                   Parameter('globalAmbig', 'Global.globalAmbig', 'In case of ambiguous alignments, which alignments should be kept (all/rand/none)')]
    
        def analyzeGroup(self, sms_file, group, fcch, env):
    
            # Call the analyzeGroup method of the inherited pipeline
            Basic.analyzeGroup(self, sms_file, group, fcch, env)
    
            # Get the binary outputs of the 'FilterAlign' step from the base pipeline
            filteredAlignments = self.queryTargets(group, 'FilterAlign','.bin')
    
            # Sort the filtered alignments
            sortedAlignments = env.SortAlign(target='sorted.align', source=filteredAlignments, group=group)
  • It is crucially important to understand that since the pipeline system is a SCons construction file, SCons reads the pipeline files and determines build order before executing any build steps. For example, any print statements or functions which do not execute builders will get executed during this read phase before any analysis steps are actually executed. Therefore, you cannot access files that will be output from the analysis steps unless you do so through a SCons custom builder. For this reason, we recommend that you perform all custom summarizing of output files through a standalone script after the pipeline has finished execution.
  • See the above section Section 4.11.4, “Reports” for more details on how to write such a script or the section Section 4.11.9, “Wrapping New Tools” for information on creating custom SCons report builders.
  • Refer to Section 4.11.7, “Running Custom Pipelines” for instructions on how to run the new pipeline

4.11.7. Running Custom Pipelines

To run a new or extended custom pipeline, do the following steps

  • Create a pipeline site configuration file in your custom pipeline directory. Use the pipeline site configuration file from the helisphere install directory as a template. For example, if your custom pipeline directory is ~/myPypeline, you may want to create a config subdirectory to store the site configuration file

    cd ~/myPypeline
    mkdir config
    cd config
    cp <helisphere_install_dir>/pypeline/config/pypeline-site.conf .
  • Edit the custom pipeline site configuration file so that the config file points to the location where your configuration files will be stored. *It is recommended that you do not store custom configurations in the Helisphere install directory. For example, user jdoe may store the configuration files in a subdirectory off of the custom pipeline directory

    configDir=/home/jdoe/myPypeline/config
  • It is recommended that you use a directory hierarchy for storing results and data similar to the existing pipelines as described in Chapter 3, Analysis Workflows
  • Generate a pipeline configuration file using the steps described in Section 4.11.5, “Pipeline Configuration File”
  • Execute the pipeline by specifiying the pipeline path, the run configuration file, and the custom pipeline site configuration file. For example, the following command executes the custom basicSort pipeline using the basicSort.conf run configuration file in the current directory, and the custom pipeline site configuration file /home/jdoe/myPypeline/config/pypeline-site.conf

    pypeline -p basicSort --pipeline-path /home/jdoe/myPypeline/ -c basicSort.conf -l /home/jdoe/myPypeline/config/pypeline-site.conf

4.11.8. Analysis Tool List

Align2Sam
Align2Txt
AlignDepReads
AlignPairsAsym
BinReads
CoverageTool
ErrorTool
ExtractSMS
FilterAlign
FilterSMS
IndexDPGenomic
LengthToolLite
LengthTool
Palmer
SMS2Txt
SMSInfo
SMSls
SNPSniffer
SortAlign
TranscriptCount

4.11.9. Wrapping New Tools

If you need to incorporate new tools into Helisphere’s pipeline framework, you will need to implement these tools as SCons custom builders. You should examine the file <helisphere_install_dir>pypeline/builders/cppframework.py for concrete examples of how to do this for analysis tools, as well as read the SCons man page and User Guide. Additionally, you will likely want to parse you tool’s output and create sqlite database tables, so you should examine <helisphere_install_dir>/pypeline/builders/parse.py and <helisphere_install_dir>/pypeline/builders/reports.py, as well as the parse functions defined in <helisphere_install_dir>/pypeline/builders/cppframework.py to understand how to write custom builders for custom reports. You will also want to read the SCons manual, specifically the portion about custom builders (http://scons.org).

4.11.10. Class Documentation

Help on method execute in module pypeline.pipelines.base:

execute(self, multiChannel=False) unbound pypeline.pipelines.base.Pipeline method
    This functions launches the pipeline analysis.
    
    Keyword arguments:
    multiChannel --  boolean indicating whether multi-channel sms files are allowable input (default False)
    
    This function iterates over the input files(s), and calls analyzeGroup() on each flowcell/channel combination. 
    It may be optionally overriden in derived pipelines to change modify this behavior.

Help on method analyzeGroup in module pypeline.pipelines.base:

analyzeGroup(self, sms_file, group, env) unbound pypeline.pipelines.base.Pipeline method
    An abstract function which must be overridden when deriving a new pipeline class, it defines the analysis steps to perform on a set of reads.
    
    Keyword arguments:
    sms_file -- current sms input file containing set of reads to analyze 
    group -- prefix describing reads being analyzed, this prefix is used to name output files
    env -- SCons construction environment defining available tools, reporting functions and other data
    
    Failure to override this method in a base class will raise a NotImplementedError. This is the function where you should define the analysis steps you need to perform for each set of input reads. Helicos has wrapped its command line tools to perform filtering, alignment and error reporting as SCons custom builders so that they can be called from within this function.  Please see the documentation section [[builders]] for more information on how to use them.

Help on method queryTargets in module pypeline.pipelines.base:

queryTargets(self, group=None, stepName=None, ext=None) unbound pypeline.pipelines.base.Pipeline method
    Returns a list of filenames representing the target outputs generated for a particular analysis step and group of reads
    
    Keyword arguments:
    group The group of reads that were analyzed to produce these targets
    stepName The analysis step which generated these targets
    ext Optional, for when you need to filter the set of output targets using a regular expression

Help on method report in module pypeline.pipelines.base:

report(self, env) unbound pypeline.pipelines.base.Pipeline method
    An empty function that may be optionally be overridden when deriving a new pipeline class, it defines the summary reports to create using data generated during the analysis of all sets of input reads
    
    Keyword arguments:
    env -- Scons construction environment defining available tools, reporting functions and other data
    
    This function defines the creation of summary reports using the outputs of the analysis steps. Helicos has provided some basic reports, but you can also build your own. The report function is called after all sets of input reads have been analyzed.  For more information, please see the documentation section [[reports]]