7 Nextflow

This is awesome! Nextflow provides a neat way to develop and structure the entire workflow of you analysis in a single script. It makes a huge difference for one aspect I mentioned in Chapter 2:

“Apart from the scripts you ran, people need to know what to run when and where.” “At least for me, it is usually quite hard even to recall the exact order of my own scripts needed for a complex data analysis […] when I come back half a year later[…].”

Apart from this using Nextflow can save you a lot of mindless typing when doing different versions of a similar analysis. The drawback is that you need to get a basic Idea of yet another language (Nextflow is based on groovy).

But the Nextflow documentation is pretty good. Reading through the manual took me quite some time but the time I saved since then more than compensated pretty quickly. Also, if you have questions beyond what is covered in the manual, the help provided on gitter is amazing. So far any question I asked there was solved within about 30 min! (Of course it important not to spam the people there with unnecessary questions - so first: RTFM)

7.1 Basics

The basic Idea of Nextflow is that it will connect different task that you do within bash (or any other scripting language) in an organized but dynamic manner. It then manages your analysis for you (waiting for job A to finish before submitting job B) and can manage quite a lot of different cluster scheduling systems. You can use it from the very beginning of your project since it is able to resume a previous run: So if you first implement job A and then (while job A) is running write job B you can simply update your nextflow script. If you then resume the run, nextflow will remember that it already ran job A and start with job B straight away. This is also great if you later need to change a middle part of your analysis - nextflow will basically only rerun what is needed downstream from the the point that you changed. This is super convenient.

To run a nextflow pipeline I usually prepare two scripts:

  • analysis.nf: this is the actual pipeline
  • nextflow.config: this is the configuration script (important on the cluster, for small local test not so much)

7.1.1 The pipeline script

The analysis.nf is where you put together you analysis. For me, this means that it is basically a series interconnected bash snippets.

The nextflow script is build from three types of building blocks:

  • Processes: the bash snippets that contain the code I would otherwise type into the terminal
  • Channels: streams of files you put through you pipeline, or streams of parameter values you want vary within your analysis
  • Operators: connect the different Processes and Channels using logic (linear, crosses), or modify a channel along the way (filtering, merging, sorting)

Processes

A nextflow process looks like this:

process gather_depth {
    label 'L_loc_test_script'
    publishDir "export_folder", mode: 'copy'

    input:
    set val( input_val ), file( input_file ) from input_channel

    output:
    file( "${input_val}.txt" ) into output_channel
    
    script:
    """
    sed 's/X-TAG-X/${input_val}/g' ${input_file} > ${input_val}.txt
   cat \$HOME/signature.txt >> ${input_val}.txt
    """
}

This breaks down into different compartments:

process /* <process_name> */ {
    /* <config section> */

    input:
    /* <input channel> */

    output:
    /* <output channel> */
    
    script: /* <task> */
    """
    # some bash code
    """
}

In the config section we can assign the process to a label (to use specific config settings defined in nextflow.config). Here we can also specify if we want to collect the output of the script outside of the work folder which is used by nextflow for the execution of the code. You can also activate conda environments or load modules for this process here.

The input and output sections are used to connect the individual processes and to manage the flow of files and values from process to process.

The script section contains the actual code to run. Here, you write the actual bash code of the pipeline.

Careful when using bash variables (eg $WORK) within nextflow. Nextflow itself uses the dollar sign ($) to assign its own variables (eg ${input_val}) in the example scripts. These are evaluated before the scripts is run and replaced by the appropriate file/value. If you want to use bash variables that are supposed to be evaluated when the script is actually run, you need to conserve the dollar sign by escaping the dollar sign (cat \$HOME/signature.txt instead of cat $HOME/signature.txt).

Channels

These are the streams of files or values. One channel could be eg. all the sequencing files you need to process, while another one could be all the values of an input parameter of of a model that you wish to test for different input settings.

Channels connect the different processes and Nextflow basically follows development of the stating channels during the development through the different steps of your pipeline. You feed a channel into a process in the input section, while the output section emits a new channel.

At the start of your pipeline you will need to define some channels like this:

Channel.from( 'A', 'B', 'C' )
Channel.from( ('01'..'09') + ('10'..'19'))
Channel.fromPath( 'genotypes.vcf.gz' )
Channel.fromPath( 'sequencing_data/*.fq.gz' )

Operators

Operators contain the actions/logic that connects different elements of your pipeline. You can use them to modify your channels by exposing them to filtering, transformations, splitting, merging. We already saw two operators in the example above (.from() & .fromPath()). So you can see that the have form of dot-name() (or dot-name{} ).

Operators (besides .from(), .fromPath() $ .fromFilePairs()) I frequently use are:

  • .splitCsv(): read csv (combined variables)
  • .set{}: channel name (single copy)
  • .into{}: channel name (multiple copies)
  • .map{}: transform channel content
  • .collect(): collapse the channel
  • .combine(): cross of two channels
  • .join(): merge two channels (sorted)
  • .groupTuple(): collapse the channel (sorted)

7.2 A quick example

I want to create a quick example of the power of nextflow. Therefore lets assume we have to do a analysis where we have a set of populations that need to be compared pairwise (eg. calculate pairwise FST along the genome). After this you want to do a running average using different window sizes.

I will use dummy commands here to show the workings of nextflow rather than doing an meaningful analysis:

The beauty of nextflow is that you basically just have to write the script for a single case. (You can check the complete nextflow on the demo_root_folder reopsitory) First we initialize the different Channels within our pipeline script.

Channel
    .fromPath("data_file.txt")
    .set{ data_channel }

Channel
    .from( [[1, "popA"], [2, "popB"], [3, "popC"]] )
    .into{ pop_channel1; pop_channel2 }

Channel
    .from( "10kb" "50kb" )
    .set{ span_channel }

Then, we combine the different channels using operators:

data_channel                 /* start with the raw data */
    .combine( pop_channel1 )  /* add pop1 to data */
    .combine( pop_channel2 )  /* cross with pop2 */
    .filter{ it[1] < it[3] }  /* discad the upper triangle of the cross */
    .map{ it[0,2,4]}          /* select only data & pops (remove indexes) */
    .combine( span_channel )  /* cross with sensitivities */
    .set{ pairs_channel }     /* name output channel */

Finally, we run the actual bash commands:

process run_pairewise {
    publishDir "output/${span}", mode: 'copy'

    input:
    set file( data ), val( pop1 ), val( pop2 ), val( span ) from pairs_channel

    output:
    file( "step1.${pop1}-${pop2}.${span}.txt" ) into step1_channel

    script:
    """
   cat ${data} > step1.${pop1}-${pop2}.${span}.txt                 # check data content
   echo "${pop1} vs. ${pop2}" >> step1.${pop1}-${pop2}.${span}.txt # run pairewise 'fst'
    echo "-- ${span} --" >> step1.${pop1}-${pop2}.${span}.txt       # running average
   """
}

That’s all it takes:

We can see that the three pair wise comparison were completed for both averaging sensitivities, since all the expected output files show up in the output folder.

But nextflow created a few more files and folders:

  • .nextflow: the folder where nextflow does its housekeeping
  • .nextflow.log: the log of the nextflow run
  • output: the output folder (we specifically asked for this one within the process)
  • work: the folder where nextflow executes the scripts. The weird prefixes (eg. [d7/fccc1d]) in the terminal output refer to sub directories of work

Of course I don not want git to remember the whole messy nextflow inner workings (the scripts is enough). So, I quickly add those files to the .gitignore (make sure to use double >>!).

Now, we made some changes - so lets update the repository (and github). First, only the .gitignore.

Then we add the rest:

Now, in the real world, I would write the nextflow script locally. Then I would commit my changes to github and pull them onto the cluster from there. Finally I would run nextflow on the cluster (nextflow run analysis.nf).

7.3 Managing the workflow on the cluster

The nice thing about running nextflow on the cluster is, that it works pretty much the same as locally - so you don’t have to deal with submitting jobs because nextflow will take care of this.

The tricky part about this is that you will need to keep nextflow running for the whole time - yet we still want to be able to log out from the cluster and shut down the laptop at the end of the day (and not keep an open ssh session for 10 days until the analysis has finished).

For this we can use screen. Basically we create a virtual session that will continue to run even if we log out from the cluster (even though we thereby close the session that created it in the first place). This is ideal for our nextflow needs: It enables us to

  1. to log on to the cluster and create a screen session
  2. start the nextflow pipeline from within screen
  3. detatch from the screen session by pressing <<ctrl><a>> <<ctrl><d>>
  4. log out from the cluster shut down our laptop
  5. (two days later) log on to the cluster
  6. reattach to the old screen session <screen -DR> or screen -d -r <session-id>
  7. check if nextflow is still running (and kill it since we by now found a typo in the pipeline…)

There is one more trick when using screen with the cluster:

My metaphor with the hotel was not exactly accurate (surprise…). That is because there are actually three log in nodes instead of a single one and with the standard log in, the log in node is chosen at random (I think).

Now if you start a screen session on on log in node and later log on to a different one, you will not find you screen session back. Luckily, you also choose a specific log in node by using eg nesh-fe2 instead of nesh-fe2.

To make things a little easier, I added these two lines to my ~/.bashrc file:

Of course you will need to adjust the mounting path to match your $WORK directory on the cluster.

Now, all I need to type to mount/log on to the cluster is toNEC or mountNEC and I will always log onto the same log in node.

7.4 The config script

The nextflow.config sets up some global setting for your nextflow run. To use the NEC computer cluster from Uni Kiel, I add this section to the config script:

env.BASE_DIR = "$WORK/project_dir"

process {
   executor = 'nqsii'
   queue = 'clmedium'
   cpus = 1
   memory = 4.GB
   time = { 1.h * task.attempt }
   errorStrategy = { task.exitStatus == Integer.MAX_VALUE ? 'retry' : 'finish' }
   maxRetries = 1
   maxErrors = '-1'
   
     withLabel: "L_loc.*" {
     executor='local'
     }

     withLabel: "L_20g15m_.*" {
       queue = 'clexpress'
       memory = 20.GB
       time = { 15.m * task.attempt }
    }
    
    withLabel: "L_---_.*" {
       clusterOptions = '---'
       queue = '---'
       memory = ---.GB
       cpus = ---
       time = { ---.h * task.attempt }
    }
}

trace { enabled = true }
report { enabled = true }

So, I set at few defaults for the jobs, and then stat defining different the types of processes that I reference within the analysis.nf script. Please refer to nextflow documentation for the different options set within the process section.

Apart from this, I also provide the path to the project folder and turn the trace and the report on by default.

7.5 Quick recap

So, in a nutshell my workflow with nextflow is as follows

  1. Initialize the repository (including the analysis.nf) locally
  2. Push to github
  3. Pull to cluster
  4. Start nextflow (cluster)
  5. develop further steps & correct bugs of the analysis.nf (locally)
  6. update github & cluster
  7. resume nextflow to run the new parts of the pipeline (cluster)
  8. rinse and repeat until the analysis is complete

To resume a nextflow run:

By the way - you can also toggle a nice graphical summary of your pipeline using nextflow run analysis.nf -with-dag analysis.png -c nextflow.config -resume: