file postprocessor_1.0.0/postprocessor_1.0.0/postprocessor.cpp

[No description available] More…

Functions

Name
scanner_plugin(postprocessor , version(1, 0, 0) )

Detailed Description

“Postprocessing” scanner plugin. Reads points from old scan output and re-runs a likelihood containing plugin for those same point. Can perform some simple addition/subtraction operations of likelihood components from the new plugin output.


Authors (add name and date if you modify):

Functions Documentation

function scanner_plugin

scanner_plugin(
    postprocessor ,
    version(1, 0, 0) 
)

The likelihood container plugin

MPI data

The reader object in use for the scan

The main postprocessing driver object

Options for PPDriver;

The constructor to run when the plugin is loaded.

Main run function

Determine what data needs to be copied from the input file to the new output dataset

Source code

//  GAMBIT: Global and Modular BSM Inference Tool
//  *********************************************
///  \file
///
///  "Postprocessing" scanner plugin. Reads points
///  from old scan output and re-runs a likelihood
///  containing plugin for those same point.
///  Can perform some simple addition/subtraction
///  operations of likelihood components from
///  the new plugin output.
///
///  *********************************************
///
///  Authors (add name and date if you modify):
//
///  \author Ben Farmer
///          (ben.farmer@gmail.com)
///  \date 2016 Mar, 2017 Jan, Feb, Mar
///
///  *********************************************

// STL
#include <vector>
#include <string>
#include <cmath>
#include <cstdio>
#include <iostream>
#include <fstream>
#include <sstream>

// GAMBIT
#include "gambit/Utils/mpiwrapper.hpp"
#include "gambit/Utils/util_functions.hpp"
#include "gambit/Utils/new_mpi_datatypes.hpp"
#include "gambit/ScannerBit/scanners/postprocessor_1.0.0/postprocessor.hpp"
#include "gambit/ScannerBit/objective_plugin.hpp"
#include "gambit/ScannerBit/scanner_plugin.hpp"

using namespace Gambit;
using namespace Gambit::PostProcessor;


// The reweigher Scanner plugin
scanner_plugin(postprocessor, version(1, 0, 0))
{
  reqd_inifile_entries("like","reader");

  /// The likelihood container plugin
  like_ptr LogLike;

  /// MPI data
  unsigned int numtasks;
  unsigned int rank;

  /// The reader object in use for the scan
  Gambit::Printers::BaseBaseReader* reader;

  /// The main postprocessing driver object
  PPDriver driver;

  /// Options for PPDriver;
  PPOptions settings;

  // Retrieve an integer from an environment variable
  int getintenv(const std::string& name)
  {
     int x = 0;
     if(const char* env_p = std::getenv(name.c_str()))
     {
       std::stringstream env_s(env_p);
       env_s >> x;
       if (!env_s)
       {
          std::ostringstream err;
          err << "Tried to retrieve value of environment variable "<<name<<" as an integer, but conversion failed! String retrieved was '"<<env_s.str()<<"'";
          scan_error().raise(LOCAL_INFO,err.str());
       }
     }
     else
     {
       std::ostringstream err;
       err << "Tried to retrieve value of environment variable "<<name<<" as an integer, but it does not seem to be defined!";
       scan_error().raise(LOCAL_INFO,err.str());
     }
     return x;
  }

  /// The constructor to run when the plugin is loaded.
  plugin_constructor
  {
    int s_numtasks;
    int s_rank;

    // Get MPI data. No communication is needed, we just need to know how to
    // split up the workload. Just a straight division among all processes is
    // used, nothing fancy.
#ifdef WITH_MPI
    MPI_Comm_size(MPI_COMM_WORLD, &s_numtasks); // MPI requires unsigned ints here, so we'll just convert afterwards
    MPI_Comm_rank(MPI_COMM_WORLD, &s_rank);
#else
    s_numtasks = 1;
    s_rank = 0;
#endif
    numtasks = s_numtasks;
    rank = s_rank;

    if(rank==0) std::cout << "Initialising 'postprocessor' plugin for ScannerBit..." << std::endl;

    // Get options for setting up the reader (these live in the inifile under:
    // Scanners:
    //  scanners:
    //    scannername:
    //      reader
    Gambit::Options reader_options = get_inifile_node("reader");

    // Initialise reader object
    get_printer().new_reader("old_points",reader_options);

    // Retrieve the reader object
    reader = get_printer().get_reader("old_points");

    // Get names of all the output data labels
    settings.data_labels = reader->get_all_labels();

    // Set up other options for the plugin
    settings.update_interval = get_inifile_value<std::size_t>("update_interval", 1000);
    settings.add_to_logl = get_inifile_value<std::vector<std::string>>("add_to_like", std::vector<std::string>());
    settings.subtract_from_logl = get_inifile_value<std::vector<std::string>>("subtract_from_like", std::vector<std::string>());
    settings.reweighted_loglike_name = get_inifile_value<std::string>("reweighted_like");

    settings.renaming_scheme = get_inifile_value<std::map<std::string,std::string>>("rename",
                          std::map<std::string,std::string>());

    settings.cut_less_than = get_inifile_value<std::map<std::string,double>>("cut_less_than",
                          std::map<std::string,double>());

    settings.cut_greater_than = get_inifile_value<std::map<std::string,double>>("cut_greater_than",
                          std::map<std::string,double>());

    settings.discard_points_outside_cuts = get_inifile_value<bool>("discard_points_outside_cuts", false);

    // Use virtual rank system?
    if(get_inifile_value<bool>("use_virtual_rank",false))
    {
        #ifdef WITH_MPI
        if(numtasks>1)
        {
          std::ostringstream err;
          err << "You have set the 'use_virtual_rank' option for the postprocessor scanner plugin to 'true', which will allow the plugin to act as if it is part of an MPI ensemble when it really isn't, however you are also running this task in an MPI batch with size > 1! You cannot use the virtual rank system at the same time as running a real MPI job! Please choose one configuration or the other and rerun the job.";
          scan_error().raise(LOCAL_INFO,err.str());
        }
        #endif
        rank     = getintenv("RANK");
        numtasks = getintenv("SIZE");
        if(rank>=numtasks)
        {
          std::ostringstream err;
          err << "Environment variable RANK was larger than permitted by SIZE ("<<numtasks<<">="<<rank<<") while running postprocessor scanner plugin with 'use_virtual_rank=true' option. This is not a valid MPI configuration, so it is an illegal choice of virtual configuration.";
          scan_error().raise(LOCAL_INFO,err.str());
        }
    }
    // Transfer MPI variables to PPOptions
    settings.rank = rank;
    settings.numtasks = numtasks;

    // Finally, there is the 'Purpose' value of the likelihood container. This may well clash
    // with the old name used in the input file, so better check for this and make the user
    // change their choice if so.
    settings.logl_purpose_name = get_inifile_value<std::string>("like");
    settings.discard_old_logl = get_inifile_value<bool>("permit_discard_old_likes",false);

    // Retrieve the external likelihood calculator
    LogLike = get_purpose(settings.logl_purpose_name);

    // Do not allow GAMBIT's own likelihood calculator to directly shut down the scan.
    // This scanner plugin will assume responsibility for this process, triggered externally by
    // the 'plugin_info.early_shutdown_in_progress()' function.
    LogLike->disable_external_shutdown();

    // Path to save resume files
    std::string defpath = get_inifile_value<std::string>("default_output_path");
    settings.root = Utils::ensure_path_exists(defpath+"/postprocessor/resume");
    if(rank==0) std::cout << "root: " << settings.root << std::endl;
  }

  /// Main run function
  int plugin_main()
  {
    if(rank==0) std::cout << "Running 'postprocessor' plugin for ScannerBit." << std::endl;

    // Set up our MPI communicator
    #ifdef WITH_MPI
    GMPI::Comm ppComm;
    ppComm.dup(MPI_COMM_WORLD,"PostprocessorComm"); // duplicates MPI_COMM_WORLD
    // Message tag definitons in PPDriver class:
    #endif

    /// Determine what data needs to be copied from the input file to the new output dataset
    // Get labels of functors listed for printing from the primary printer.
    settings.all_params = get_printer().get_stream()->getPrintList();
    // There are some extra items that will also be automatically printed in all scans,
    // so we need to avoid copying those:
    settings.all_params.insert("unitCubeParameters"); // It would be better to keep the originals here, but currently cannot turn off the printing from within like_ptr.
    settings.all_params.insert("MPIrank"); // These should be re-printed the same as they were anyway
    settings.all_params.insert("pointID");
    settings.all_params.insert(settings.logl_purpose_name); // If there is a name clash and the run was not aborted, we are to discard the old data under this name.
    settings.all_params.insert("Modified" + settings.logl_purpose_name);
    settings.all_params.insert(settings.reweighted_loglike_name); //   "  "
    #ifdef WITH_MPI
    settings.comm = &ppComm;
    #endif

    // Construct the main driver object
    driver = PPDriver(reader,get_printer().get_stream(),LogLike,settings);

    // Check that the supplied settings make sense
    driver.check_settings();

    // Points which have already been processed in a previous (aborted) run
    ChunkSet done_chunks; // Empty by default

    // Ask the printer if this is a resumed run or not, and check that the necessary files exist if so.
    bool resume = get_printer().resume_mode();

    //MAIN LOOP HERE
    bool continue_processing = true;
    #ifdef WITH_MPI
      bool quit_flag_seen = false;
    #endif
    while(continue_processing)
    {
       #ifdef WITH_MPI
         bool I_am_finished = false;
       #endif
       if (resume) // If resuming (or redistributing), get the required resume data
       {
          done_chunks = get_done_points(settings.root);
          // For debugging: report on the retrieved chunks
          //for(auto it=done_chunks.begin(); it!=done_chunks.end(); ++it)
          //   std::cout << "Rank "<<rank<<": retrieved done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
       }

       bool do_redistribution = false;
       int exit_code;
       // 0 - Finished processing all the points we were assigned
       // 1 - Saw quit flag and so stopped prematurely
       // 2 - Was told to stop by some other process for redistribution of postprocessing work
       // 3 - Encountered end of input file unexpectedly
       exit_code = driver.run_main_loop(done_chunks);
       std::cout << "Rank "<<rank<<": exited loop with code "<<exit_code<<std::endl;
       if(exit_code==0)
       {
          #ifdef WITH_MPI
            I_am_finished = true;
          #endif
          std::cout << "Rank "<<rank<<" has finished processing its batch; requesting work from other processes." << std::endl;
          // We are finished, but there might still be lots of points to process
          // that are assigned to other cpus in this job. We will request that
          // all processes stop and redistribute their workload.
          // But first, we check if anyone else has already requested this stop!
          // In that case there is no need to send another stop message.
          #ifdef WITH_MPI
            if(not driver.check_for_redistribution_request())
            {
               driver.send_redistribution_request();
            }
            do_redistribution = true;
          #endif
       }
       else if(exit_code==1)
       {
          // Saw quit flag, time to stop
          #ifdef WITH_MPI
            quit_flag_seen = true;
          #endif
          continue_processing = false;
          do_redistribution = true; // Need to unlock processes that may be waiting for point redistribution
       }
       else if(exit_code==2)
       {
          // Stopped due to workload redistribution request from another process.
          do_redistribution = true;
       }
       else if(exit_code==3)
       {
          // That shouldn't happen; warning
          std::ostringstream err;
          err << "Postprocessing on "<<rank<<" ended due to encountering the end of the input file. This indicates that it was told to process more points than existed in the input file, which indicates a bug in the postprocessor. Your output may still be fine, but please report this bug.";
          std::cerr << err.str() << std::endl;
          scan_error().raise(LOCAL_INFO,err.str());
       }
       else
       {
          std::ostringstream err;
          err << "Postprocessing on "<<rank<<" terminated with an unrecognised return code ("<<exit_code<<"). This indicates a bug in the postprocessor, please report it.";
          scan_error().raise(LOCAL_INFO,err.str());
       }

       // Redistribute points, stop if someone has seen the quit flag, or stop if there are no points left to process
       if(do_redistribution)
       {
          #ifdef WITH_MPI
            /// @{ Gather all 'I_am_finished' flags to see if everyone is done
            int my_finished = (I_am_finished ? 1 : 0); // convert to int in a way that prevents unused-variable warnings
            std::vector<int> all_finished = allgather_int(my_finished, ppComm);
            bool everyone_finished = true;
            int tmp_rank = 0;
            for(auto it=all_finished.begin(); it!=all_finished.end(); ++it)
            {
               if(*it!=1)
               {
                  //std::cout << "Rank "<<rank<<": rank "<<tmp_rank<<" is not finished ("<<*it<<")"<<std::endl;
                  everyone_finished = false; // If anyone has not finished, we proceed with redistribution.
               }
               tmp_rank++;
            }
            /// @}

            // All processes should now be synchronised; receive all the redistribution requests
            std::cout << "Rank "<<rank<<": Clearing redistribution request messages" << std::endl;
            driver.clear_redistribution_requests();

            if(not everyone_finished)
            {
               /// @{ Gather all quit flags, to see if we need to stop (COLLECTIVE OPERATION)
               int my_quit = (quit_flag_seen ? 1 : 0); // convert to int in a way that prevents unused-variable warnings
               std::vector<int> all_quit_flags = allgather_int(my_quit, ppComm);
               for(auto it=all_quit_flags.begin(); it!=all_quit_flags.end(); ++it)
               {
                  if(*it==1)
                  {
                     continue_processing = false; // If anyone has seen the quit flag, we must stop.
                  }
               }
               /// @}
               if(continue_processing)
               {
                  /// Not quitting; do the redistribution.
                  //if(rank==0) std::cout << "Some processes have finished their work, but others are still going. Redistributing remaining workload amongst all available cpus" << std::endl;
                  std::cout << "Rank "<<rank<<": Some processes have finished their work, but others are still going. Redistributing remaining workload amongst all available cpus" << std::endl;
                  resume = true; // Perform next loop as if we are resuming the scan.
               }
               else
               {
                  if(rank==0) std::cout << "Quit flag seen by one or more worker processes; stopping postprocessor." << std::endl;
               }
            }
            else
            {
               // Everyone is finished! We can stop
               continue_processing = false;
               if(rank==0) std::cout << "All processes report that they are finished with their postprocessing batches. No work left to do, so we will stop." << std::endl;
            }
          #endif
       }
    }
    //if(rank==0) std::cout << "Done!" << std::endl;
    std::cout << "Rank "<< rank<< ": Done!" << std::endl;

    // Test barrier to see if everyone made it
    #ifdef WITH_MPI
    ppComm.Barrier();
    if(rank==0) std::cout << "Passed final PP barrier" << std::endl;
    #endif
    return 0;
  }
}

Updated on 2024-07-18 at 13:53:33 +0000