AnalysisFlow.cxx
1 #include "AnalysisFlow.h"
2 #include "OutputConvention.h"
3 #include "ProgressBar.h"
4 #include "FilterStrategy.h"
5 #include "QualityCut.h"
6 #include "AcclaimFilters.h"
7 #include "NoiseMonitor.h"
8 #include "AcclaimCmdLineArgs.h"
9 #include "UsefulAnitaEvent.h"
10 #include "Compression.h"
11 
12 ClassImp(Acclaim::AnalysisFlow);
13 
14 
16 
17  fOutFileBaseName = args->output_filename;
18 
19  fSelection = (AnalysisFlow::selection) args->event_selection;
20  fFilterStrat = filterStrat;
21 
22  if(!checkForSgeTaskId()){
23  fDivision = args->division;
24  fNumDivisions = args->numdivisions;
25  fRun = args->run;
26  }
27 
28 
29  fSumTree = NULL;
30  fData = NULL;
31  fReco = NULL;
32  fOutFile = NULL;
33  fSettings = NULL;
34  fEventSummary = NULL;
35  fEv = NULL;
36 
37  fFirstEntry=0;
38  fLastEntry=0;
40  fDebug = 0;
43  fBlindStrat = AnitaDataset::kNoBlinding;
44 
45  prepareEverything(args->settings_filename);
46 }
47 
48 
49 
50 
51 
52 
54  // on the Hoffman2 cluster, this is the task ID
55  // we're going to use it to try and figure out the run, division and numdivision
56  // assuming this wasn't explicitly set...
57  const char* sgeTaskId = "SGE_TASK_ID";
58  const char* sgeTaskIdEnv = getenv(sgeTaskId);
59 
60  if(sgeTaskIdEnv){
61  // we have runs 130-439
62  // I'm going to set the job array like:
63  // 130-439 for 1 job per run = 309 jobs
64  // 1300-4399 for 10 jobs per run = 3099 jobs
65  // 13000-43999 for 100 jobs per run = 30999 jobs (that's probably excessive)
66  Int_t jobArrayIndex = atoi(sgeTaskIdEnv);
67 
68  if(jobArrayIndex < 1e3){
69  fNumDivisions = 1;
70  fRun = jobArrayIndex;
71  fDivision = 0;
72  }
73  else if(jobArrayIndex < 1e4){
74  fNumDivisions = 10;
75  fRun = jobArrayIndex/fNumDivisions;
76  fDivision = jobArrayIndex%10;
77  }
78  else if(jobArrayIndex < 1e5){
79  fNumDivisions = 100;
80  fRun = jobArrayIndex/fNumDivisions;
81  fDivision = jobArrayIndex%100;
82  }
83  else{
84  std::cerr << "Error in " << __FILE__ << " couldn't figure out the run/divisions from " << sgeTaskId << ". Giving up." << std::endl;
85  exit(1);
86  }
87 
88  std::cout << "Info in " << __PRETTY_FUNCTION__ << ". "
89  << "Found " << sgeTaskId << " " << sgeTaskIdEnv
90  << ", so the jobArrayIndex = " << jobArrayIndex
91  << ", set fRun = " << fRun
92  << ", fNumDivisions = " << fNumDivisions
93  << ", fDivision = " << fDivision << "." << std::endl;
94  return true;
95  }
96  else{
97  return false;
98  }
99 }
100 
101 
102 
104 
105  if(fEv){
106  delete fEv;
107  fEv = NULL;
108  }
109 
110  if(fData){
111  delete fData;
112  fData = NULL;
113  }
114 
115  if(fSettings){
116  if(fOutFile){
117  fSettings->write(fOutFile);
118  }
119  delete fSettings;
120  }
121 
122  if(fReco){
123  delete fReco;
124  fReco = NULL;
125  }
126 
127  if(fFilterStrat){
128  delete fFilterStrat;
129  fFilterStrat = NULL;
130  }
131 
132  if(fOutFile){
133  fOutFile->Write();
134  fOutFile->Close();
135  fOutFile = NULL;
136  }
137 }
138 
139 
140 
141 
143 
144  if(fData==NULL){
145  bool doDecimated = fSelection == kDecimated ? true : false;
146  fData = new AnitaDataset(fRun, doDecimated, WaveCalType::kDefault, AnitaDataset::ANITA_ROOT_DATA, fBlindStrat);
147 
148  Long64_t numEntriesInRun = fData->N();
149 
150  Long64_t numEventsPerDivision = numEntriesInRun/fNumDivisions;
151 
152  fFirstEntry = numEventsPerDivision*fDivision;
153  fLastEntry = fFirstEntry + numEventsPerDivision;
154 
155  if(fDivision==fNumDivisions-1){
156  Long64_t numRemainder = numEntriesInRun%fNumDivisions;
157  fLastEntry += numRemainder;
158  }
159 
160  // std::cerr << "Info in " << __PRETTY_FUNCTION__ << "Current blinding strategy: " << fData->getDescription(fData->getStrategy()) << std::endl;
161 
162  }
163 }
164 
165 
167 
168 
169  if(fOutFile==NULL && fOutFileBaseName!=""){
170 
171  // The output convention was originally designed to take argc and argv from the start of main
172  // so here I reconstruct what they might have been.
173  // Previously I used the argv to specify the run and subdivision in the file name, so I do that again here.
174 
175  std::vector<char*> fakeArgv;
176  fakeArgv.push_back((char*) fOutFileBaseName.Data());
177  TString runStr = TString::Format("%d", fRun);
178  fakeArgv.push_back((char*) runStr.Data());
179 
180  TString extraDigits;
181  if(fNumDivisions > 1){
182 
183  Int_t numDigitsTotal = floor(TMath::Log10(fNumDivisions) + 1);
184  Int_t numDigitsThis = fDivision == 0 ? 1 : floor(TMath::Log10(fDivision) + 1);
185 
186  // std::cout << fNumDivisions << "\t" << numDigitsTotal << std::endl;
187  // std::cout << fDivision << "\t" << numDigitsThis << std::endl;
188 
189  for(int i=0; i < numDigitsTotal - numDigitsThis; i++){
190  // std::cout << i << "\t" << numDigitsTotal - numDigitsThis << std::endl;
191 
192  extraDigits += TString::Format("0");
193  }
194  extraDigits += TString::Format("%d", fDivision);
195 
196  // std::cout << extraDigits << std::endl;
197 
198  fakeArgv.push_back((char*) extraDigits.Data());
199  }
200  Int_t fakeArgc = (Int_t) fakeArgv.size();
201 
202  // std::cout << fakeArgc << ", " << &fakeArgv[0] << std::endl;
203  // for(int i=0; i < fakeArgc; i++){
204  // std::cout << "\t:" << i << ", " << fakeArgv[i] << std::endl;
205  // }
206 
207  OutputConvention oc(fakeArgc, &fakeArgv[0]);
208 
209  fOutFile = oc.makeFile();
210  if(fOutFileCompressionLevel >= 0){
211  fOutFile->SetCompressionLevel(fOutFileCompressionLevel);
212  // kUseGlobalCompressionSetting,
213  // kZLIB,
214  // kLZMA,
215  // kOldCompressionAlgo
216  fOutFile->SetCompressionAlgorithm((ROOT::ECompressionAlgorithm) fOutFileCompressionAlgo);
217  }
218 
219  // std::cout << fOutFile << "\t" << fOutFile->GetName() << std::endl;
220  }
221 
222 }
223 
224 
225 
227 
228  Bool_t doEvent = false;
229 
230  switch (fSelection){
231 
232  case kWaisPulser:
233  doEvent = isPulserWAIS(header, usefulPat);
234  break;
235 
236  case kDecimated:
237  doEvent = true;
238  break;
239 
240  case kAll:
241  doEvent = true;
242  break;
243 
244  // case kWaisPulserAndNonRF:
245  // doEvent = isPulserWAIS(header, usefulPat) || !(header->getTriggerBitRF());
246  // break;
247 
248  default:
249  std::cerr << "Warning in " << __PRETTY_FUNCTION__ << ", unknown event selection." << std::endl;
250  doEvent = false;
251  break;
252  }
253 
254  return doEvent;
255 }
256 
257 
258 
260 
261  const UInt_t triggerTimeNsExpected = usefulPat->getWaisDivideTriggerTimeNs();
262  const int maxSeparationMeters = 1e6; // 1000 km
263  const double c_ns = 0.3; // speed of light in nano-seconds
264  Bool_t isWais = false;
265  if(triggerTimeNsExpected < maxSeparationMeters/c_ns){
266 
267  const Double_t maxDeltaTriggerTimeNs = 1200;
268  UInt_t triggerTimeNs = header->triggerTimeNs;
269  Int_t deltaTriggerTimeNs = Int_t(triggerTimeNs) - Int_t(triggerTimeNsExpected);
270 
271  isWais = TMath::Abs(deltaTriggerTimeNs) < maxDeltaTriggerTimeNs;
272  // std::cerr << __PRETTY_FUNCTION__ << "\t" << isWais << "\t" << triggerTimeNs << "\t" << triggerTimeNsExpected << std::endl;
273  }
274  return isWais;
275 }
276 
277 
278 
279 
280 
281 
283 
284  const UInt_t triggerTimeNsExpected = usefulPat->getLDBTriggerTimeNs();
285  const int maxSeparationMeters = 1e6; // 1000 km
286  const double c_ns = 0.3; // speed of light in nano-seconds
287  Bool_t isLDB = false;
288 
289  if(triggerTimeNsExpected < maxSeparationMeters/c_ns){
290 
291  const double cutTimeNs = 1200;
292  Int_t delay=0;
293 
294  if (fRun<150){ // LDB VPOL
295  delay = 25000000; // V-POL pulse at 25 ms runs 145-149
296  }
297  else if (fRun<154){ // LDB HPOL
298  delay = 50000000; // H-POL pulse at 50 ms
299  }
300  else if (fRun<172){ // LDB VPOL
301  delay = 50000000; // V-POL pulse at 50 ms runs 154-171
302  }
303 
304  const Int_t delayGenerator = 199996850; // delay generator was not exactly 200 ms
305  const Int_t constdelay = 500;
306 
307  Int_t deltaTriggerTimeNs = Int_t(header->triggerTimeNs) - Int_t(triggerTimeNsExpected);
308  deltaTriggerTimeNs = deltaTriggerTimeNs%(delayGenerator) - delay - constdelay;
309 
310  isLDB = TMath::Abs(deltaTriggerTimeNs) < cutTimeNs;
311  }
312  return isLDB;
313 }
314 
315 
316 
317 
318 
320 
321  if(isPulserWAIS(header, usefulPat)){
322  sum->flags.pulser = AnitaEventSummary::EventFlags::WAIS;
323  }
324  else if(isPulserLDB(header, usefulPat)){
325  sum->flags.pulser = AnitaEventSummary::EventFlags::LDB;
326  }
327  // std::cerr << __PRETTY_FUNCTION__ << ": I just set the flags to be " << sum->flags.pulser << std::endl;
328 }
329 
330 
331 
332 void Acclaim::AnalysisFlow::prepareEverything(const char* preferredSettingsFileName){
333 
334  if(fRun >= 257 && fRun <= 263){
335  if(AnitaVersion::get()==3){
336  std::cerr << "No ANITA-3 data for run " << fRun << " so won't do analysis" << std::endl;
337  return;
338  }
339  }
340 
341  if(!fSettings){
342  fSettings = new AnalysisSettings(preferredSettingsFileName);
343  fSettings->apply(dynamic_cast<TObject*>(this));
344  }
345 
346  if(!fReco){
347  fReco = new AnalysisReco();
348  fSettings->apply(fReco);
349  }
350 
351  if(!fData){ // do this after reading blinding strategy this->fBlindStrat from config file
352  prepareDataSet();
353  }
354 
355  if(!fOutFile){
356  prepareOutputFiles();
357  }
358 
359  fEventSummary = NULL;
360  if(fOutFile && !fSumTree){
361  fSumTree = new TTree("sumTree", "Tree of AnitaEventSummaries");
362  fSumTree->Branch("sum", &fEventSummary);
363  }
364 
365  if(fFilterStrat){
366  // this doesn't necessarily mean the output will be saved
367  // it will only be saved if operations were added with the optional enable_output bool = true
368  fFilterStrat->attachFile(fOutFile);
369  }
370  else{
371  // empty strategy does nothing
372  fFilterStrat = new FilterStrategy();
373  }
374 
375  fNoiseMonitor = new NoiseMonitor(fFilterStrat);
376  fLastEventConsidered = 0;
377 }
378 
379 
380 
382  int entry = fData->getEvent(eventNumber);
383  return entry > 0 ? doEntry(entry) : NULL;
384 }
385 
386 
387 
389 
390  fData->getEntry(entry);
391 
392  RawAnitaHeader* header = fData->header();
393  UsefulAnitaEvent* usefulEvent = fData->useful();
394 
395  if(fDebug){
396  std::cerr << "Debug in " << __PRETTY_FUNCTION__ << " doing entry " << entry << " for selection " << fSelection
397  << ", header->eventNumber = " << header->eventNumber << ", usefulEvent->eventNumber = " << usefulEvent->eventNumber
398  << std::endl;
399  }
400 
401  // make FourierBuffer filters behave as if we were considering sequential events
402  // this is useful for the decimated data set...
403  if(fLastEventConsidered + 1 != header->eventNumber){
404  Filters::makeFourierBuffersLoadHistoryOnNextEvent(fFilterStrat);
405  }
406 
407  Adu5Pat* pat = fData->gps();
408  UsefulAdu5Pat usefulPat(pat);
409 
410  Bool_t needToReconstruct = shouldIDoThisEvent(header, &usefulPat);
411 
412  fEventSummary = NULL;
413 
414  if(needToReconstruct){
415  fEventSummary = new AnitaEventSummary(header, &usefulPat, fData->truth());
416  Bool_t isGoodEvent = QualityCut::applyAll(usefulEvent, fEventSummary);
417 
418  // varner2 contains a check that all channels have lots of points, too few can crash interpolation
419  if(isGoodEvent || (fDoAll && !fEventSummary->flags.isVarner2)){
420 
421  setPulserFlags(header, &usefulPat, fEventSummary);
422 
423  // since we now have rolling averages make sure the filter strategy is sees every event before deciding whether or not to reconstruct
424  if(fEv){
425  delete fEv;
426  }
427  fEv = new FilteredAnitaEvent(usefulEvent, fFilterStrat, pat, header, false);
428  fReco->process(fEv, fEventSummary, fNoiseMonitor);
429  }
430 
431  if(fSumTree){
432  fSumTree->Fill();
433  }
434  }
435 
436  fLastEventConsidered = header->eventNumber;
437  return fEventSummary;
438 
439 }
440 
441 
442 
443 void Acclaim::AnalysisFlow::doAnalysis(Long64_t startAtThisEntry){
444 
445  fLastEventConsidered = 0;
446 
447  std::cout << "Info in " << __PRETTY_FUNCTION__ << ", doing run " << fRun
448  << " from entry " << fFirstEntry << " to " << fLastEntry << std::endl;
449 
450  if(startAtThisEntry >= fFirstEntry && startAtThisEntry < fLastEntry){
451  std::cout << "Info in " << __PRETTY_FUNCTION__ << ", starting at entry " << startAtThisEntry
452  << " instead of entry " << fFirstEntry << std::endl;
453  fFirstEntry = startAtThisEntry;
454  }
455  else if(startAtThisEntry!=-1){
456  std::cerr << "Warning in " << __PRETTY_FUNCTION__ << " got requested start entry "
457  << startAtThisEntry << ", which lies outside range " << fFirstEntry
458  << " to " << fLastEntry << ". Ignoring request!" << std::endl;
459  }
460 
461  const Long64_t numEntries = fLastEntry-fFirstEntry;
462  ProgressBar p(numEntries);
463  for(Long64_t entryInd = 0; entryInd < numEntries; entryInd++){
464  Long64_t entry = fFirstEntry + entryInd;
465  AnitaEventSummary* sum = doEntry(entry);
466 
467  if(sum){
468  delete sum;
469  }
470  p.inc(entryInd, numEntries);
471  }
472 }
UInt_t eventNumber
Event number from software.
Definition: RawAnitaEvent.h:33
Bool_t checkForSgeTaskId()
Look for an environmental variable called SGE_TASK_ID for running on the hoffman2 cluster...
bool isLDB(const RawAnitaHeader *hdr, const AnalysisConfig *cfg=0)
Definition: Util.cc:92
Bool_t isPulserWAIS(RawAnitaHeader *header, UsefulAdu5Pat *usefulPat)
void prepareDataSet()
Create the data set if not already done.
selection fSelection
Which event selection is applied?
Definition: AnalysisFlow.h:144
int fRun
Which run in the data set should be is being processed?
Definition: AnalysisFlow.h:145
static Bool_t applyAll(const UsefulAnitaEvent *usefulEvent, AnitaEventSummary *sum=NULL)
Applies all the event quality cuts in succession, this should be the primary interface.
Definition: QualityCut.cxx:45
UInt_t triggerTimeNs
Trigger time in ns from TURF.
selection
A human readable method of selecting the set of events to process, this can be expanded as required...
Definition: AnalysisFlow.h:42
Adu5Pat – The ADU5 Position and Attitude Data.
Definition: Adu5Pat.h:26
void setPulserFlags(RawAnitaHeader *header, UsefulAdu5Pat *usefulPat, AnitaEventSummary *sum)
Does the event reconstruction, and produces a summary of it.
Definition: AnalysisReco.h:30
UInt_t getWaisDivideTriggerTimeNs() const
Gets the time of flight to Taylor Dome.
AnalysisFlow(CmdLineArgs *args, FilterStrategy *filterStrat=NULL)
Preferred constructor.
UInt_t fLastEventConsidered
Tracks the event numbers processed by the class.
Definition: AnalysisFlow.h:159
~AnalysisFlow()
Destructor.
TFile * makeFile()
Create the new output file with proper name.
void prepareOutputFiles()
Coax the OutputConvention class into creating appropriately named output files.
Int_t fDebug
Controls the printing of debug info, feel free to wrap noisy stderr messages with if(fDebug)...
Definition: AnalysisFlow.h:258
AnalysisReco * fReco
The reconstruction class, fills an AnitaEventSummary.
Definition: AnalysisFlow.h:153
Bool_t isPulserLDB(RawAnitaHeader *header, UsefulAdu5Pat *usefulPat)
A filter strategy defines the sets of filters that are used and provides some introspection abilities...
High level interface to join up the various bits of the analysis.
Definition: AnalysisFlow.h:34
A class to systematically name files produced by my analysis programs.
RawAnitaHeader – The Raw ANITA Event Header.
Long64_t fFirstEntry
The first entry to process (derived from fDivision and fNumDivisions)
Definition: AnalysisFlow.h:150
int fNumDivisions
For parallel processing the run is divided into this many portions.
Definition: AnalysisFlow.h:146
A simple command line option parser.
void inc(Long64_t &entry, Long64_t numEntries=-1)
New primary function to move through main for loop in analysis program.
AnitaDataset * fData
Rootified data handler.
Definition: AnalysisFlow.h:149
UInt_t getLDBTriggerTimeNs() const
Gets the time of flight to Siple.
What you should call for analysis work.
FilteredAnitaEvent * fEv
The most recently produced FilteredAnitaEvent, updated after each event processed.
Definition: AnalysisFlow.h:157
TTree * fSumTree
The produced TTree of AnitaEventSummary.
Definition: AnalysisFlow.h:163
AnalysisSettings * fSettings
Contains configurable numbers parsed from an Acclaim analysis settings file.
Definition: AnalysisFlow.h:155
Prints a progress bar and timer to stderr.
Definition: ProgressBar.h:25
UsefulAnitaEvent – The Calibrated Useful Anita Event object.
TFile * fOutFile
the output file, will contain TTree of AnitaEventSummary
Definition: AnalysisFlow.h:162
AnitaDataset::BlindingStrategy fBlindStrat
The blinding strategy with which to initialize fData.
Definition: AnalysisFlow.h:302
Int_t fOutFileCompressionLevel
What compression level for the ROOT output files? There&#39;s no reason not to set this to the maximum po...
Definition: AnalysisFlow.h:356
TString fOutFileBaseName
The meat of the output file name.
Definition: AnalysisFlow.h:161
As UsefulAnitaEvent is to RawAnitaEvent, UsefulAdu5Pat is to Adu5Pat. Well not quite as useful but yo...
Definition: UsefulAdu5Pat.h:39
int fDivision
Which portion of the run to process (runs from 0 to fNumDivisions)
Definition: AnalysisFlow.h:147
FilterStrategy * fFilterStrat
Which filter strategy is applied to the events.
Definition: AnalysisFlow.h:154
void doAnalysis(Long64_t startAtThisEntry=-1)
Common analysis format between UCorrelator and Acclaim.
AnitaEventSummary * doEvent(UInt_t eventNumber)
Long64_t fLastEntry
The final entry to process (derived from fDivision and fNumDivisions)
Definition: AnalysisFlow.h:151
Bool_t shouldIDoThisEvent(RawAnitaHeader *header, UsefulAdu5Pat *usefulPat)
Applies high level event selection.
void prepareEverything(const char *preferredSettingsFileName=NULL)
Initialize all input and output objects.
AnitaEventSummary * doEntry(Long64_t entry)
This class is intended to store all the necessary data about an ANITA event for filtering and analysi...
Int_t fOutFileCompressionAlgo
Which compression algorithm for the ROOT output files?
Definition: AnalysisFlow.h:373
AnitaEventSummary * fEventSummary
The most recently filled AnitaEventSummary, updated after each event processed.
Definition: AnalysisFlow.h:156