from . import fs from . import const from . import util from .acars import ACARS from .sound import startSound import time #--------------------------------------------------------------------------------------- ## @package mlx.checks # # The classes that check the state of the aircraft. # # During the flight the program periodically queries various data from the # simulator. This data is returned in instances of the \ref # mlx.fs.AircraftState class and passed to the various "checkers", # i.e. instances of subclasses of the \ref StateChecker class. These checkers # perform various checks to see if the aircraft's parameters are within the # expected limits, or some of them just logs something. # # There are a few special ones, such as \ref StageChecker which computes the # transitions from one stage of the flight to the next one. Or \ref ACARSSender # which sends the ACARS periodically #--------------------------------------------------------------------------------------- class StateChecker(object): """Base class for classes the instances of which check the aircraft's state from some aspect. As a result of the check they may log something, or notify of some fault, etc.""" def check(self, flight, aircraft, logger, oldState, state): """Perform the check and do whatever is needed. This default implementation raises a NotImplementedError.""" raise NotImplementedError() #--------------------------------------------------------------------------------------- class StageChecker(StateChecker): """Check the flight stage transitions.""" def __init__(self): """Construct the stage checker.""" self._flareStarted = False def check(self, flight, aircraft, logger, oldState, state): """Check the stage of the aircraft.""" stage = flight.stage if stage==None: aircraft.setStage(state, const.STAGE_BOARDING) elif stage==const.STAGE_BOARDING: if not state.parking or \ (not state.trickMode and state.groundSpeed>5.0) or \ (flight.departureGateIsTaxiThrough and state.antiCollisionLightsOn): aircraft.setStage(state, const.STAGE_PUSHANDTAXI) elif stage==const.STAGE_TAKEOFF: if not state.gearsDown or \ (state.radioAltitude>3000.0 and state.vs>0): aircraft.setStage(state, const.STAGE_CLIMB) elif not state.landingLightsOn and \ (state.strobeLightsOn is False or (state.strobeLightsOn is None and state.xpdrC is False)) and \ state.onTheGround and \ state.groundSpeed<50.0: aircraft.setStage(state, const.STAGE_RTO) elif stage==const.STAGE_CLIMB: if (state.altitude+2000) > flight.cruiseAltitude: aircraft.setStage(state, const.STAGE_CRUISE) elif state.radioAltitude<2000.0 and \ state.vs < 0.0 and state.gearsDown: aircraft.setStage(state, const.STAGE_LANDING) elif stage==const.STAGE_CRUISE: if (state.altitude+2000) < flight.cruiseAltitudeForDescent: aircraft.setStage(state, const.STAGE_DESCENT) elif stage==const.STAGE_DESCENT or stage==const.STAGE_GOAROUND: if state.gearsDown and state.radioAltitude<2000.0: aircraft.setStage(state, const.STAGE_LANDING) elif (state.altitude+2000) > flight.cruiseAltitude: aircraft.setStage(state, const.STAGE_CRUISE) elif stage==const.STAGE_LANDING: if state.onTheGround and state.groundSpeed<25.0: aircraft.setStage(state, const.STAGE_TAXIAFTERLAND) elif not state.gearsDown: aircraft.setStage(state, const.STAGE_GOAROUND) elif state.radioAltitude>200 and self._flareStarted: aircraft.cancelFlare() self._flareStarted = False elif state.radioAltitude<150 and not state.onTheGround and \ not self._flareStarted: self._flareStarted = True aircraft.prepareFlare() elif stage==const.STAGE_TAXIAFTERLAND: if state.parking and aircraft.checkFlightEnd(state): aircraft.setStage(state, const.STAGE_END) #--------------------------------------------------------------------------------------- class ACARSSender(StateChecker): """Sender of online ACARS. It sends the ACARS every 3 minutes to the MAVA website.""" ## The interval at which the ACARS are sent INTERVAL = 3*60.0 def __init__(self, gui): """Construct the ACARS sender.""" self._gui = gui self._lastSent = None self._sending = False def check(self, flight, aircraft, logger, oldState, state): """If the time has come to send the ACARS, send it.""" if self._sending: return now = time.time() if self._lastSent is not None and \ (self._lastSent + ACARSSender.INTERVAL)>now: return self._sending = True acars = ACARS(self._gui, state) self._gui.webHandler.sendACARS(self._acarsCallback, acars) def _acarsCallback(self, returned, result): """Callback for ACARS sending.""" if returned: print("Sent online ACARS") self._lastSent = time.time() if self._lastSent is None \ else self._lastSent + ACARSSender.INTERVAL else: print("Failed to send the ACARS") self._sending = False #--------------------------------------------------------------------------------------- class TakeOffLogger(StateChecker): """Logger for the cruise speed.""" def __init__(self): """Construct the logger.""" self._onTheGround = True def check(self, flight, aircraft, logger, oldState, state): """Log the cruise speed if necessary.""" if flight.stage==const.STAGE_TAKEOFF and \ self._onTheGround and not state.onTheGround: logger.message(state.timestamp, "Takeoff speed: %.0f %s" % \ (flight.speedFromKnots(state.ias), flight.getEnglishSpeedUnit())) logger.message(state.timestamp, "Takeoff heading: %03.0f degrees" % (state.heading,)) logger.message(state.timestamp, "Takeoff pitch: %.1f degrees" % (state.pitch,)) logger.message(state.timestamp, "Takeoff flaps: %.0f" % (state.flapsSet)) logger.message(state.timestamp, "CG/Trim: %.1f%%/%.2f" % \ (state.cog*100.0, state.elevatorTrim)) if abs(state.pitch)>aircraft.maxTakeOffPitch: flight.handleNoGo("TOPitch", state.timestamp, "Takeoff pitch higher than aircraft maximum (%.2f)" % \ (aircraft.maxTakeOffPitch,), "TO TAILSTRIKE NO GO") self._onTheGround = False #--------------------------------------------------------------------------------------- class InitialClimbSpeedLogger(StateChecker): """Logger for the initial climb speed.""" def __init__(self): """Construct the logger.""" self._logged = False def check(self, flight, aircraft, logger, oldState, state): """Log the initial climb speed if the altitude is reached.""" if not self._logged and \ state.radioAltitude>=aircraft.initialClimbSpeedAltitude and \ flight.stage in [const.STAGE_TAKEOFF, const.STAGE_CLIMB]: logger.message(state.timestamp, "Initial climb speed: %.0f %s - %.0f %s AGL" % \ (flight.speedFromKnots(state.ias), flight.getEnglishSpeedUnit(), flight.aglFromFeet(state.radioAltitude), flight.getEnglishAGLUnit())) self._logged = True #--------------------------------------------------------------------------------------- class CruiseSpeedLogger(StateChecker): """Logger for the cruise speed.""" def __init__(self): """Construct the logger.""" self._lastTime = None def check(self, flight, aircraft, logger, oldState, state): """Log the cruise speed if necessary.""" if flight.stage==const.STAGE_CRUISE and \ (self._lastTime is None or \ (self._lastTime+800)<=state.timestamp): if state.altitude>24500.0: logger.message(state.timestamp, "Cruise speed: %.3f mach" % (state.mach,)) else: logger.message(state.timestamp, "Cruise speed: %.0f %s" % (flight.speedFromKnots(state.ias), flight.getEnglishSpeedUnit())) self._lastTime = state.timestamp #--------------------------------------------------------------------------------------- class SpoilerLogger(StateChecker): """Logger for the spoiler.""" def __init__(self): """Construct the logger.""" self._spoilersDown = True self._logged = False self._spoilersExtension = None def check(self, flight, aircraft, logger, oldState, state): """Log the cruise speed if necessary.""" spoilersDown = state.spoilersExtension==0 if flight.stage==const.STAGE_LANDING and state.onTheGround: if not self._logged: if not spoilersDown and self._spoilersDown: logger.message(state.timestamp, "Speedbrake deployed") self._logged = True config = flight.config if config.enableSounds and config.speedbrakeAtTD: startSound(const.SOUND_SPEEDBRAKE) elif spoilersDown!=self._spoilersDown: logger.message(state.timestamp, "Speedbrake " + ("down" if spoilersDown else "up")) self._spoilersDown = spoilersDown #--------------------------------------------------------------------------------------- class VisibilityChecker(StateChecker): """Inform the pilot of the visibility once when descending below 2000 ft, then when descending below 1000 ft.""" def __init__(self): """Construct the visibility checker.""" self._informedBelow2000 = False self._informedBelow1000 = False def check(self, flight, aircraft, logger, oldState, state): """Check if we need to inform the pilot of the visibility.""" if flight.stage==const.STAGE_DESCENT or \ flight.stage==const.STAGE_LANDING: if (state.radioAltitude<2000 and not self._informedBelow2000) or \ (state.radioAltitude<1000 and not self._informedBelow1000): visibilityString = util.visibility2String(state.visibility) fs.sendMessage(const.MESSAGETYPE_VISIBILITY, "Current visibility: " + visibilityString, 5) logger.message(state.timestamp, "Pilot was informed about the visibility: " + visibilityString) self._informedBelow2000 = True self._informedBelow1000 = state.radioAltitude<1000 #--------------------------------------------------------------------------------------- class ApproachCalloutsPlayer(StateChecker): """A state checker that plays a sequence of approach callouts. It tracks the altitude during the descent and landing phases and if the altitude crosses one that has a callout associated with and the vertical speed is negative, that callout will be played.""" def __init__(self, approachCallouts): """Construct the approach callouts player.""" self._approachCallouts = approachCallouts self._altitudes = approachCallouts.getAltitudes(descending = False) def check(self, flight, aircraft, logger, oldState, state): """Check if we need to play a callout.""" if (flight.stage==const.STAGE_DESCENT or \ flight.stage==const.STAGE_LANDING) and state.vs<0: oldRadioAltitude = oldState.radioAltitude radioAltitude = state.radioAltitude for altitude in self._altitudes: if radioAltitude<=altitude and \ oldRadioAltitude>altitude: startSound(self._approachCallouts[altitude]) break #--------------------------------------------------------------------------------------- class StateChangeLogger(StateChecker): """Base class for classes the instances of which check if a specific change has occured in the aircraft's state, and log such change.""" def __init__(self, logInitial = True, excludedStages = None): """Construct the logger. If logInitial is True, the initial value will be logged, not just the changes later. If excludedStages is given, it should be a list containing those stages during which the changes should not be logged. Child classes should define the following functions: - _changed(self, oldState, state): returns a boolean indicating if the value has changed or not - _getMessage(self, flight, state, forced): return a strings containing the message to log with the new value """ self._logInitial = logInitial self._excludedStages = [] if excludedStages is None else excludedStages def _getLogTimestamp(self, state, forced): """Get the log timestamp.""" return state.timestamp def check(self, flight, aircraft, logger, oldState, state): """Check if the state has changed, and if so, log the new state.""" if flight.stage in self._excludedStages: return shouldLog = False if oldState is None: shouldLog = self._logInitial else: shouldLog = self._changed(oldState, state) if shouldLog: self.logState(flight, logger, state) def logState(self, flight, logger, state, forced = False): """Log the state.""" message = self._getMessage(flight, state, forced) if message is not None: logger.message(self._getLogTimestamp(state, forced), message) #------------------------------------------------------------------------------- class SimpleChangeMixin(object): """A mixin that defines a _changed() function which simply calls a function to retrieve the value two monitor for both states and compares them. Child classes should define the following function: - _getValue(state): get the value we are interested in.""" def _changed(self, oldState, state): """Determine if the value has changed.""" currentValue = self._getValue(state) return currentValue is not None and self._getValue(oldState)!=currentValue #--------------------------------------------------------------------------------------- class SingleValueMixin(object): """A mixin that provides a _getValue() function to query a value from the state using the name of the attribute.""" def __init__(self, attrName): """Construct the mixin with the given attribute name.""" self._attrName = attrName def _getValue(self, state): """Get the value of the attribute from the state.""" return getattr(state, self._attrName) #--------------------------------------------------------------------------------------- class DelayedChangeMixin(object): """A mixin to a StateChangeLogger that stores the old value and reports a change only if the change has been there for a certain amount of time. Child classes should define the following function: - _getValue(state): get the value we are interested in.""" def __init__(self, minDelay = 3.0, maxDelay = 10.0): """Construct the mixin with the given delay in seconds.""" self._minDelay = minDelay self._maxDelay = maxDelay self._oldValue = None self._firstChange = None self._lastChangeState = None self._lastLoggedValue = None self._logState = lambda flight, logger, state, forced = False: \ StateChangeLogger.logState(self, flight, logger, state, forced = forced) self.logState = lambda flight, logger, state, forced = False: \ DelayedChangeMixin.logState(self, flight, logger, state, forced = forced) def _changed(self, oldState, state): """Determine if the value has changed.""" if self._oldValue is None: self._oldValue = self._getValue(oldState) newValue = self._getValue(state) if self._isDifferent(self._oldValue, newValue): if self._lastLoggedValue is not None and \ not self._isDifferent(self._lastLoggedValue, newValue): self._firstChange = None else: if self._firstChange is None: self._firstChange = state.timestamp self._lastChangeState = state self._oldValue = newValue if self._firstChange is not None: if state.timestamp >= min(self._lastChangeState.timestamp + self._minDelay, self._firstChange + self._maxDelay): self._firstChange = None return True return False def _getLogTimestamp(self, state, forced): """Get the log timestamp.""" return self._lastChangeState.timestamp \ if not forced and self._lastChangeState is not None \ else state.timestamp def _isDifferent(self, oldValue, newValue): """Determine if the given values are different. This default implementation checks for simple equality.""" return oldValue!=newValue def logState(self, flight, logger, state, forced): """Log the given state. The value belonging to that state is recorded in _lastLoggedValue. It calls _logState to perform the real logging.""" self._lastLoggedValue = self._getValue(state) self._logState(flight, logger, state, forced = forced) #--------------------------------------------------------------------------------------- class TemplateMessageMixin(object): """Mixin to generate a message based on a template. Child classes should define the following function: - _getValue(state): get the value we are interested in.""" def __init__(self, template): """Construct the mixin.""" self._template = template def _getMessage(self, flight, state, forced): """Get the message.""" value = self._getValue(state) return None if value is None else self._template % (value,) #--------------------------------------------------------------------------------------- class GenericStateChangeLogger(StateChangeLogger, SingleValueMixin, DelayedChangeMixin, TemplateMessageMixin): """Base for generic state change loggers that monitor a single value in the state possibly with a delay and the logged message comes from a template""" def __init__(self, attrName, template, logInitial = True, excludedStages = None, minDelay = 0.0, maxDelay = 0.0): """Construct the object.""" StateChangeLogger.__init__(self, logInitial = logInitial, excludedStages = excludedStages) SingleValueMixin.__init__(self, attrName) DelayedChangeMixin.__init__(self, minDelay = minDelay, maxDelay = maxDelay) TemplateMessageMixin.__init__(self, template) self._getLogTimestamp = \ lambda state, forced: \ DelayedChangeMixin._getLogTimestamp(self, state, forced) #--------------------------------------------------------------------------------------- class ForceableLoggerMixin(object): """A mixin for loggers that can be forced to log a certain state.""" def forceLog(self, flight, logger, state): """Force logging the given state.""" self.logState(flight, logger, state, forced = True) #--------------------------------------------------------------------------------------- class AltimeterLogger(StateChangeLogger, SingleValueMixin, DelayedChangeMixin): """Logger for the altimeter setting.""" def __init__(self): """Construct the logger.""" StateChangeLogger.__init__(self, logInitial = True) SingleValueMixin.__init__(self, "altimeter") DelayedChangeMixin.__init__(self) self._getLogTimestamp = \ lambda state, forced: \ DelayedChangeMixin._getLogTimestamp(self, state, forced) def _getMessage(self, flight, state, forced): """Get the message to log on a change.""" logState = self._lastChangeState if \ self._lastChangeState is not None else state message = "Altimeter: %.2f hPa at %.0f feet" % \ (logState.altimeter, logState.altitude) if not logState.altimeterReliable: message += " (u/r)" return message #--------------------------------------------------------------------------------------- class NAVLogger(StateChangeLogger, DelayedChangeMixin, ForceableLoggerMixin): """Logger for NAV radios. It also logs the OBS radial set.""" excludedStages = [const.STAGE_BOARDING, const.STAGE_PUSHANDTAXI, const.STAGE_RTO, const.STAGE_TAXIAFTERLAND, const.STAGE_PARKING] @staticmethod def getMessage(logName, frequency, obs): """Get the message for the given NAV radio setting.""" message = "%-5s %s" % (logName + ":", frequency) if obs is not None: message += " (%03d\u00b0)" % (obs,) return message def __init__(self, attrName, logName): """Construct the NAV logger.""" StateChangeLogger.__init__(self, logInitial = False, excludedStages = self.excludedStages) DelayedChangeMixin.__init__(self) self._getLogTimestamp = \ lambda state, forced: \ DelayedChangeMixin._getLogTimestamp(self, state, forced) self._attrName = attrName self._logName = logName def _getValue(self, state): """Get the value. If both the frequency and the obs settings are available, a tuple containing them is returned, otherwise None.""" frequency = getattr(state, self._attrName) obs = getattr(state, self._attrName + "_obs") manual = getattr(state, self._attrName + "_manual") return (frequency, obs, manual) def _getMessage(self, flight, state, forced): """Get the message.""" (frequency, obs, manual) = self._getValue(state) return None if frequency is None or obs is None or \ (not manual and not forced) else \ self.getMessage(self._logName, frequency, obs) def _isDifferent(self, oldValue, newValue): """Determine if the valie has changed between the given states.""" (oldFrequency, oldOBS, _oldManual) = oldValue (newFrequency, newOBS, _newManual) = newValue return oldFrequency!=newFrequency or oldOBS!=newOBS #--------------------------------------------------------------------------------------- class ILSLogger(NAVLogger): """Logger for the ILS radio setting.""" def __init__(self): """Construct the logger.""" super(ILSLogger, self).__init__("ils", "ILS") #--------------------------------------------------------------------------------------- class NAV1Logger(NAVLogger): """Logger for the NAV1 radio setting.""" def __init__(self): """Construct the logger.""" super(NAV1Logger, self).__init__("nav1", "NAV1") #--------------------------------------------------------------------------------------- class NAV2Logger(NAVLogger): """Logger for the NAV2 radio setting.""" def __init__(self): """Construct the logger.""" super(NAV2Logger, self).__init__("nav2", "NAV2") #--------------------------------------------------------------------------------------- class ADFLogger(GenericStateChangeLogger, ForceableLoggerMixin): """Base class for the ADF loggers.""" def __init__(self, attr, logName): """Construct the ADF logger.""" GenericStateChangeLogger.__init__(self, attr, "%s: %%s" % (logName,), logInitial = False, excludedStages = NAVLogger.excludedStages, minDelay = 3.0, maxDelay = 10.0) #--------------------------------------------------------------------------------------- class ADF1Logger(ADFLogger): """Logger for the ADF1 radio setting.""" def __init__(self): """Construct the logger.""" super(ADF1Logger, self).__init__("adf1", "ADF1") #--------------------------------------------------------------------------------------- class ADF2Logger(ADFLogger): """Logger for the ADF2 radio setting.""" def __init__(self): """Construct the logger.""" super(ADF2Logger, self).__init__("adf2", "ADF2") #--------------------------------------------------------------------------------------- class SquawkLogger(GenericStateChangeLogger): """Logger for the squawk setting.""" def __init__(self): """Construct the logger.""" super(SquawkLogger, self).__init__("squawk", "Squawk code: %s", minDelay = 3.0, maxDelay = 10.0) #--------------------------------------------------------------------------------------- class LightsLogger(StateChangeLogger, SingleValueMixin, DelayedChangeMixin): """Base class for the loggers of the various lights.""" def __init__(self, attrName, template): """Construct the logger.""" StateChangeLogger.__init__(self) SingleValueMixin.__init__(self, attrName) DelayedChangeMixin.__init__(self) self._getLogTimestamp = \ lambda state, forced: \ DelayedChangeMixin._getLogTimestamp(self, state, forced) self._template = template def _getMessage(self, flight, state, forced): """Get the message from the given state.""" value = self._getValue(state) if value is None: return None else: return self._template % ("ON" if value else "OFF") #--------------------------------------------------------------------------------------- class AnticollisionLightsLogger(LightsLogger): """Logger for the anti-collision lights.""" def __init__(self): LightsLogger.__init__(self, "antiCollisionLightsOn", "Anti-collision lights: %s") #--------------------------------------------------------------------------------------- class LandingLightsLogger(LightsLogger): """Logger for the landing lights.""" def __init__(self): LightsLogger.__init__(self, "landingLightsOn", "Landing lights: %s") #--------------------------------------------------------------------------------------- class StrobeLightsLogger(LightsLogger): """Logger for the strobe lights.""" def __init__(self): LightsLogger.__init__(self, "strobeLightsOn", "Strobe lights: %s") #--------------------------------------------------------------------------------------- class NavLightsLogger(LightsLogger): """Logger for the navigational lights.""" def __init__(self): LightsLogger.__init__(self, "navLightsOn", "Navigational lights: %s") #--------------------------------------------------------------------------------------- class FlapsLogger(StateChangeLogger, SingleValueMixin, DelayedChangeMixin): """Logger for the flaps setting.""" def __init__(self): """Construct the logger.""" StateChangeLogger.__init__(self, logInitial = True, excludedStages = [const.STAGE_BOARDING, const.STAGE_PUSHANDTAXI, const.STAGE_RTO, const.STAGE_TAKEOFF]) SingleValueMixin.__init__(self, "flapsSet") DelayedChangeMixin.__init__(self) self._getLogTimestamp = \ lambda state, forced: \ DelayedChangeMixin._getLogTimestamp(self, state, forced) def _getMessage(self, flight, state, forced): """Get the message to log on a change.""" logState = self._lastChangeState if \ self._lastChangeState is not None else state speed = logState.groundSpeed if logState.groundSpeed<80.0 \ else logState.ias message = "Flaps %.0f - %.0f %s, %.0f %s AGL, %.0f ft AMSL" % \ (logState.flapsSet, flight.speedFromKnots(speed), flight.getEnglishSpeedUnit(), flight.aglFromFeet(logState.radioAltitude), flight.getEnglishAGLUnit(), logState.altitude) return message #--------------------------------------------------------------------------------------- class GearsLogger(StateChangeLogger, SingleValueMixin, SimpleChangeMixin): """Logger for the gears state.""" def __init__(self): """Construct the logger.""" StateChangeLogger.__init__(self, logInitial = True) SingleValueMixin.__init__(self, "gearControlDown") def _getMessage(self, flight, state, forced): """Get the message to log on a change.""" return "Gears SET to %s at %.0f %s, %.0f ft, %.0f %s AGL" % \ ("DOWN" if state.gearControlDown else "UP", flight.speedFromKnots(state.ias), flight.getEnglishSpeedUnit(), state.altitude, flight.aglFromFeet(state.radioAltitude), flight.getEnglishAGLUnit()) #--------------------------------------------------------------------------------------- class APLogger(StateChangeLogger, DelayedChangeMixin): """Log the state of the autopilot.""" @staticmethod def _logBoolean(logger, timestamp, what, value): """Log a boolean value. what is the name of the value that is being logged.""" message = what + " " if value is None: message += "cannot be detected, will not log" else: message += "is " + ("ON" if value else "OFF") logger.message(timestamp, message) @staticmethod def _logNumeric(logger, timestamp, what, format, value): """Log a numerical value.""" message = what if value is None: message += " cannot be detected, will not log" else: message += ": " + format % (value,) logger.message(timestamp, message) @staticmethod def _logAPMaster(logger, timestamp, state): """Log the AP master state.""" APLogger._logBoolean(logger, timestamp, "AP master", state.apMaster) @staticmethod def _logAPHeadingHold(logger, timestamp, state): """Log the AP heading hold state.""" APLogger._logBoolean(logger, timestamp, "AP heading hold", state.apHeadingHold) @staticmethod def _logAPHeading(logger, timestamp, state): """Log the AP heading.""" APLogger._logNumeric(logger, timestamp, "AP heading", "%03.0f\u00b0", state.apHeading) @staticmethod def _logAPAltitudeHold(logger, timestamp, state): """Log the AP altitude hold state.""" APLogger._logBoolean(logger, timestamp, "AP altitude hold", state.apAltitudeHold) @staticmethod def _logAPAltitude(logger, timestamp, state): """Log the AP heading.""" APLogger._logNumeric(logger, timestamp, "AP altitude", "%.0f ft", state.apAltitude) def __init__(self): """Construct the state logger.""" StateChangeLogger.__init__(self) DelayedChangeMixin.__init__(self) self._lastLoggedState = None self.logState = lambda flight, logger, state:\ APLogger.logState(self, flight, logger, state) def _getValue(self, state): """Convert the relevant values from the given state into a tuple.""" return (state.apMaster, state.apHeadingHold, state.apHeading, state.apAltitudeHold, state.apAltitude) def _isDifferent(self, oldValue, newValue): """Determine if the given old and new values are different (enough) to be logged.""" (oldAPMaster, oldAPHeadingHold, oldAPHeading, oldAPAltitudeHold, oldAPAltitude) = oldValue (apMaster, apHeadingHold, apHeading, apAltitudeHold, apAltitude) = newValue if apMaster is not None and apMaster!=oldAPMaster: return True if apMaster is False: return False if apHeadingHold is not None and apHeadingHold!=oldAPHeadingHold: return True if apHeadingHold is not False and apHeading is not None and \ apHeading!=oldAPHeading: return True if apAltitudeHold is not None and apAltitudeHold!=oldAPAltitudeHold: return True if apAltitudeHold is not False and apAltitude is not None and \ apAltitude!=oldAPAltitude: return True return False def logState(self, flight, logger, state): """Log the autopilot state.""" timestamp = DelayedChangeMixin._getLogTimestamp(self, state, False) if self._lastLoggedState is None: self._logAPMaster(logger, timestamp, state) if state.apMaster is not False or state.apHeadingHold is None: self._logAPHeadingHold(logger, timestamp, state) if state.apMaster is not False and \ (state.apHeadingHold is not False or state.apHeading is None): self._logAPHeading(logger, timestamp, state) if state.apMaster is not False or state.apAltitudeHold is None: self._logAPAltitudeHold(logger, timestamp, state) if state.apMaster is not False and \ (state.apAltitudeHold is not False or state.apAltitude is None): self._logAPAltitude(logger, timestamp, state) self._firstCall = False else: oldState = self._lastLoggedState apMasterTurnedOn = False if state.apMaster is not None and state.apMaster!=oldState.apMaster: apMasterTurnedOn = state.apMaster self._logAPMaster(logger, timestamp, state) if state.apMaster is not False: apHeadingHoldTurnedOn = False if state.apHeadingHold is not None and \ (state.apHeadingHold!=oldState.apHeadingHold or apMasterTurnedOn): apHeadingHoldTurnedOn = state.apHeadingHold self._logAPHeadingHold(logger, timestamp, state) if state.apHeadingHold is not False and \ state.apHeading is not None and \ (state.apHeading!=oldState.apHeading or apMasterTurnedOn or apHeadingHoldTurnedOn): self._logAPHeading(logger, timestamp, state) apAltitudeHoldTurnedOn = False if state.apAltitudeHold is not None and \ (state.apAltitudeHold!=oldState.apAltitudeHold or apMasterTurnedOn): apAltitudeHoldTurnedOn = state.apAltitudeHold self._logAPAltitudeHold(logger, timestamp, state) if state.apAltitudeHold is not False and \ state.apAltitude is not None and \ (state.apAltitude!=oldState.apAltitude or apMasterTurnedOn or apAltitudeHoldTurnedOn): self._logAPAltitude(logger, timestamp, state) self._lastLoggedState = state #--------------------------------------------------------------------------------------- class FaultChecker(StateChecker): """Base class for checkers that look for faults.""" @staticmethod def _appendDuring(flight, message): """Append a 'during XXX' test to the given message, depending on the flight stage.""" stageStr = const.stage2string(flight.stage) return message if stageStr is None \ else (message + " during " + stageStr.upper()) @staticmethod def _getLinearScore(minFaultValue, maxFaultValue, minScore, maxScore, value): """Get the score for a faulty value where the score is calculated linearly within a certain range.""" if valuemaxFaultValue: return maxScore else: return minScore + (maxScore-minScore) * (value-minFaultValue) / \ (maxFaultValue - minFaultValue) #--------------------------------------------------------------------------------------- class SimpleFaultChecker(FaultChecker): """Base class for fault checkers that check for a single occurence of a faulty condition. Child classes should implement the following functions: - isCondition(self, flight, aircraft, oldState, state): should return whether the condition holds - logFault(self, flight, aircraft, logger, oldState, state): log the fault via the logger.""" def check(self, flight, aircraft, logger, oldState, state): """Perform the check.""" if self.isCondition(flight, aircraft, oldState, state): self.logFault(flight, aircraft, logger, oldState, state) #--------------------------------------------------------------------------------------- class PatientFaultChecker(FaultChecker): """A fault checker that does not decides on a fault when the condition arises immediately, but can wait some time. Child classes should implement the following functions: - isCondition(self, flight, aircraft, oldState, state): should return whether the condition holds - logFault(self, flight, aircraft, logger, oldState, state): log the fault via the logger """ def __init__(self, timeout = 2.0): """Construct the fault checker with the given timeout.""" self._timeout = timeout self._faultStarted = None def getTimeout(self, flight, aircraft, oldState, state): """Get the timeout. This default implementation returns the timeout given in the constructor, but child classes might want to enforce a different policy.""" return self._timeout def check(self, flight, aircraft, logger, oldState, state): """Perform the check.""" if self.isCondition(flight, aircraft, oldState, state): if self._faultStarted is None: self._faultStarted = state.timestamp timeout = self.getTimeout(flight, aircraft, oldState, state) if state.timestamp>=(self._faultStarted + timeout): self.logFault(flight, aircraft, logger, oldState, state) self._faultStarted = state.timestamp else: self._faultStarted = None #--------------------------------------------------------------------------------------- class AntiCollisionLightsChecker(PatientFaultChecker): """Check for the anti-collision light being off at high N1 values.""" def __init__(self): """Construct the anti-collision lights checker.""" super(AntiCollisionLightsChecker, self).__init__(timeout = 5.0) def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return (not flight.config.usingFS2Crew or not state.parking or flight.stage!=const.STAGE_TAXIAFTERLAND) and \ state.antiCollisionLightsOn is False and \ self.isEngineCondition(state) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(AntiCollisionLightsChecker, state.timestamp, FaultChecker._appendDuring(flight, "Anti-collision lights were %s" % ("on" if state.antiCollisionLightsOn else "off",)), 1) def isEngineCondition(self, state): """Determine if the engines are in such a state that the lights should be on.""" if state.n1 is not None: for n1 in state.n1: if n1 is not None and n1>5: return True return False elif state.rpm is not None: return max(state.rpm)>0 else: return False #--------------------------------------------------------------------------------------- class TupolevAntiCollisionLightsChecker(AntiCollisionLightsChecker): """Check for the anti-collision light for Tuplev planes.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" numEnginesRunning = 0 for n1 in state.n1: if n1>5: numEnginesRunning += 1 if flight.stage==const.STAGE_PARKING or \ flight.stage==const.STAGE_TAXIAFTERLAND: return numEnginesRunning1 and not state.antiCollisionLightsOn or \ numEnginesRunning<1 and state.antiCollisionLightsOn #--------------------------------------------------------------------------------------- class TupolevLandingLightsChecker(PatientFaultChecker): """Check if the landing light is not switched on above an IAS of 340 km/h.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.landingLightsOn and state.ias>(340.0*const.KMPHTOKNOTS) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(TupolevLandingLightsChecker, state.timestamp, "The landing lights were on above an IAS of 340 km/h", 1) #--------------------------------------------------------------------------------------- class BankChecker(SimpleFaultChecker): """Check for the bank is within limits.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" isXPlane = (flight.aircraftType==const.AIRCRAFT_DH8D or flight.aircraftType==const.AIRCRAFT_B733) and \ (flight.fsType==const.SIM_XPLANE12 or flight.fsType==const.SIM_XPLANE11 or flight.fsType==const.SIM_XPLANE10 or flight.fsType==const.SIM_XPLANE9) if flight.stage==const.STAGE_CRUISE: bankLimit = 40 if isXPlane else 30 elif flight.stage in [const.STAGE_TAKEOFF, const.STAGE_CLIMB, const.STAGE_DESCENT, const.STAGE_LANDING]: bankLimit = 45 if isXPlane else 35 else: return False return state.bank>bankLimit or state.bank<-bankLimit def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" message = "Bank too steep (%.1f)" % (state.bank,) flight.handleFault(BankChecker, state.timestamp, FaultChecker._appendDuring(flight, message), 2) #--------------------------------------------------------------------------------------- class FlapsRetractChecker(SimpleFaultChecker): """Check if the flaps are not retracted too early.""" def __init__(self): """Construct the flaps checker.""" self._timeStart = None def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds. FIXME: check if this really is the intention (FlapsRetractedMistake.java)""" if (flight.stage==const.STAGE_TAKEOFF and not state.onTheGround and aircraft.type!=const.AIRCRAFT_F70) or \ (flight.stage==const.STAGE_LANDING and state.onTheGround): if self._timeStart is None: self._timeStart = state.timestamp if state.flapsSet==0 and state.timestamp<=(self._timeStart+2.0): return True else: self._timeStart = None return False def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(FlapsRetractChecker, state.timestamp, FaultChecker._appendDuring(flight, "Flaps retracted"), 20) #--------------------------------------------------------------------------------------- class FlapsSpeedLimitChecker(SimpleFaultChecker): """Check if the flaps are extended only at the right speeds.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" speedLimit = aircraft.getFlapsSpeedLimit(state.flapsSet) return speedLimit is not None and state.smoothedIAS>speedLimit def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(FlapsSpeedLimitChecker, state.timestamp, FaultChecker._appendDuring(flight, "Flap speed limit fault"), 5) #--------------------------------------------------------------------------------------- class GearsDownChecker(SimpleFaultChecker): """Check if the gears are down at low altitudes.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.radioAltitude<10 and not state.gearsDown and \ flight.stage!=const.STAGE_TAKEOFF def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleNoGo(GearsDownChecker, state.timestamp, "Gears not down at %.0f feet radio altitude" % \ (state.radioAltitude,), "GEAR DOWN NO GO") #--------------------------------------------------------------------------------------- class GearSpeedLimitChecker(PatientFaultChecker): """Check if the gears not down at too high a speed.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.gearsDown and state.smoothedIAS>aircraft.gearSpeedLimit def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(GearSpeedLimitChecker, state.timestamp, FaultChecker._appendDuring(flight, "Gear speed limit fault"), 5) #--------------------------------------------------------------------------------------- class GLoadChecker(SimpleFaultChecker): """Check if the G-load does not exceed 2 except during flare.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.gLoad>2.0 and not state.onTheGround and \ (flight.stage!=const.STAGE_LANDING or state.radioAltitude>=50) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(GLoadChecker, state.timestamp, "G-load was %.2f" % (state.gLoad,), 10) #--------------------------------------------------------------------------------------- class LandingLightsChecker(PatientFaultChecker): """Check if the landing lights are used properly.""" def getTimeout(self, flight, aircraft, oldState, state): """Get the timeout. It is the default timeout except for landing and takeoff.""" return 0.0 if flight.stage in [const.STAGE_TAKEOFF, const.STAGE_LANDING] else self._timeout def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.landingLightsOn is not None and \ ((flight.stage==const.STAGE_BOARDING and \ state.landingLightsOn and state.onTheGround) or \ (flight.stage==const.STAGE_TAKEOFF and \ not state.landingLightsOn and not state.onTheGround) or \ (flight.stage in [const.STAGE_CLIMB, const.STAGE_CRUISE, const.STAGE_DESCENT] and \ state.landingLightsOn and state.altitude>12500) or \ (flight.stage==const.STAGE_LANDING and \ not state.landingLightsOn and state.onTheGround and state.ias>50.0) or \ (flight.stage==const.STAGE_PARKING and \ state.landingLightsOn and state.onTheGround)) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" score = 0 if flight.stage in [const.STAGE_TAKEOFF, const.STAGE_LANDING] else 1 message = "Landing lights were %s" % \ (("on" if state.landingLightsOn else "off"),) flight.handleFault(LandingLightsChecker, state.timestamp, FaultChecker._appendDuring(flight, message), score) #--------------------------------------------------------------------------------------- class TransponderChecker(PatientFaultChecker): """Check if the transponder is used properly.""" def __init__(self): """Construct the transponder checker.""" super(TransponderChecker, self).__init__() self._liftOffTime = None def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" if state.onTheGround: self._liftOffTime = None elif self._liftOffTime is None: self._liftOffTime = state.timestamp return state.xpdrC is not None and \ not state.xpdrC and \ (flight.stage in [const.STAGE_CRUISE, const.STAGE_DESCENT, const.STAGE_GOAROUND] or \ (flight.stage==const.STAGE_LANDING and state.groundSpeed>50.0) or \ ((not state.autoXPDR or \ (self._liftOffTime is not None and state.timestamp > (self._liftOffTime+8))) and \ ((flight.stage==const.STAGE_TAKEOFF and not state.onTheGround) or flight.stage==const.STAGE_CLIMB)) ) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" score = 0 message = "Transponder was %s" % \ (("mode C" if state.xpdrC else "standby"),) flight.handleFault(TransponderChecker, state.timestamp, FaultChecker._appendDuring(flight, message), score) #--------------------------------------------------------------------------------------- class WeightChecker(PatientFaultChecker): """Base class for checkers that check that some limit is not exceeded.""" def __init__(self, name): """Construct the checker.""" super(WeightChecker, self).__init__(timeout = 5.0) self._name = name def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" if flight.entranceExam: return False limit = self.getLimit(flight, aircraft, state) if limit is not None: #if flight.options.compensation is not None: # limit += flight.options.compensation return self.getWeight(state)>limit return False def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" mname = "M" + self._name flight.handleNoGo(self.__class__, state.timestamp, "%s exceeded: %s is %.0f kg" % \ (mname, self._name, self.getWeight(state)), "%s NO GO" % (mname,)) def getWeight(self, state): """Get the weight that is interesting for us.""" return state.grossWeight #--------------------------------------------------------------------------------------- class MLWChecker(WeightChecker): """Checks if the MLW is not exceeded on landing.""" def __init__(self): """Construct the checker.""" super(MLWChecker, self).__init__("LW") def getLimit(self, flight, aircraft, state): """Get the limit if we are in the right state.""" return aircraft.mlw if flight.stage==const.STAGE_LANDING and \ state.onTheGround and \ not flight.entranceExam else None #--------------------------------------------------------------------------------------- class MTOWChecker(WeightChecker): """Checks if the MTOW is not exceeded on landing.""" def __init__(self): """Construct the checker.""" super(MTOWChecker, self).__init__("TOW") def getLimit(self, flight, aircraft, state): """Get the limit if we are in the right state.""" return aircraft.mtow if flight.stage==const.STAGE_TAKEOFF and \ not flight.entranceExam else None #--------------------------------------------------------------------------------------- class MZFWChecker(WeightChecker): """Checks if the MZFW is not exceeded on landing.""" def __init__(self): """Construct the checker.""" super(MZFWChecker, self).__init__("ZFW") def getLimit(self, flight, aircraft, state): """Get the limit if we are in the right state.""" return aircraft.mzfw if not flight.entranceExam else None def getWeight(self, state): """Get the weight that is interesting for us.""" return state.zfw #--------------------------------------------------------------------------------------- class NavLightsChecker(PatientFaultChecker): """Check if the navigational lights are used properly.""" def __init__(self): """Construct the NAV lights checker.""" super(NavLightsChecker, self).__init__(timeout = 5.0) def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return flight.stage!=const.STAGE_BOARDING and \ flight.stage!=const.STAGE_PARKING and \ state.navLightsOn is False def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(NavLightsChecker, state.timestamp, FaultChecker._appendDuring(flight, "Navigation lights were off"), 1) #--------------------------------------------------------------------------------------- class WindSensitiveFaultChecker(FaultChecker): """A fault checker which checks for a fault condition that might arise due to suddenly changing winds. If the condition is detected, its fact is logged, and the winds are also logged repeatedly until and for a while after the condition has ceased. A fault is logged only if the condition holds for a certain period of time.""" # The time the winds were logged last. This is global to avoid duplicate # logging if several wind-sensitive fault conditions gold _windsLastLogged = None # The wind logging interval _windsLogInterval = 5.0 def __init__(self, faultTimeout = 30.0, afterLoggingTime = 20.0): """Construct the fault checker.""" self._faultTimeout = faultTimeout self._afterLoggingTime = afterLoggingTime self._faultStarted = None self._faultCeased = None def check(self, flight, aircraft, logger, oldState, state): """Check for the condition and do whatever is needed.""" timestamp = state.timestamp logWinds = False if self.isCondition(flight, aircraft, oldState, state): logWinds = True if self._faultStarted is None: self._faultStarted = timestamp self._faultCeased = None self.logCondition(flight, aircraft, logger, oldState, state, False) WindSensitiveFaultChecker._windsLastLogged = None elif timestamp >= (self._faultStarted + self._faultTimeout): self.logCondition(flight, aircraft, logger, oldState, state, True) self._faultStarted = timestamp else: if self._faultStarted is not None and self._faultCeased is None: self._faultCeased = timestamp self._faultStarted = None if self._faultCeased is not None: if timestamp < (self._faultCeased + self._afterLoggingTime): logWinds = True else: self._faultCeased = None if logWinds and \ (WindSensitiveFaultChecker._windsLastLogged is None or timestamp >= (WindSensitiveFaultChecker._windsLastLogged + self._windsLogInterval)): logger.message(timestamp, "Winds: %.f knots from %.f" % (state.windSpeed, state.windDirection)) WindSensitiveFaultChecker._windsLastLogged = timestamp #--------------------------------------------------------------------------------------- class OverspeedChecker(WindSensitiveFaultChecker): """Check if Vne has been exceeded.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.overspeed def logCondition(self, flight, aircraft, logger, oldState, state, isFault): """Log the condition fault.""" if isFault: flight.handleFault(OverspeedChecker, state.timestamp, FaultChecker._appendDuring(flight, "Overspeed"), 20) else: logger.message(state.timestamp, FaultChecker._appendDuring(flight, "Overspeed")) #--------------------------------------------------------------------------------------- class StallChecker(WindSensitiveFaultChecker): """Check if stall occured.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return flight.stage in [const.STAGE_TAKEOFF, const.STAGE_CLIMB, const.STAGE_CRUISE, const.STAGE_DESCENT, const.STAGE_LANDING] and state.stalled def logCondition(self, flight, aircraft, logger, oldState, state, isFault): """Log the condition.""" if isFault: score = 40 if flight.stage in [const.STAGE_TAKEOFF, const.STAGE_LANDING] else 30 flight.handleFault(StallChecker, state.timestamp, FaultChecker._appendDuring(flight, "Stalled"), score) else: logger.message(state.timestamp, FaultChecker._appendDuring(flight, "Stalled")) #--------------------------------------------------------------------------------------- class PayloadChecker(SimpleFaultChecker): """Check if the payload matches the specification.""" TOLERANCE=550 @staticmethod def isZFWFaulty(aircraftZFW, flightZFW): """Check if the given aircraft's ZFW is outside of the limits.""" return aircraftZFW < (flightZFW - PayloadChecker.TOLERANCE) or \ aircraftZFW > (flightZFW + PayloadChecker.TOLERANCE) def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return not flight.entranceExam and \ flight.stage==const.STAGE_PUSHANDTAXI and \ PayloadChecker.isZFWFaulty(state.zfw, flight.zfw) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleNoGo(PayloadChecker, state.timestamp, "ZFW difference is more than %d kgs" % \ (PayloadChecker.TOLERANCE,), "ZFW NO GO") #--------------------------------------------------------------------------------------- class PitotChecker(PatientFaultChecker): """Check if pitot heat is on.""" def __init__(self): """Construct the checker.""" super(PitotChecker, self).__init__(timeout = 3.0) def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.groundSpeed>80 and state.pitotHeatOn is False def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" score = 2 if flight.stage in [const.STAGE_TAKEOFF, const.STAGE_CLIMB, const.STAGE_CRUISE, const.STAGE_DESCENT, const.STAGE_LANDING] else 0 flight.handleFault(PitotChecker, state.timestamp, FaultChecker._appendDuring(flight, "Pitot heat was off"), score) #--------------------------------------------------------------------------------------- class ReverserLogger(StateChecker): """Logger for the reverser.""" def check(self, flight, aircraft, logger, oldState, state): """Log the cruise speed if necessary.""" if oldState is not None and state.reverser != oldState.reverser: reverser = max(state.reverser) logger.message(state.timestamp, "Reverser " + ("unlocked" if reverser else "closed") + (" - %.0f %s" % (flight.speedFromKnots(state.ias), flight.getEnglishSpeedUnit()))) #--------------------------------------------------------------------------------------- class ReverserChecker(SimpleFaultChecker): """Check if the reverser is not used below the speed prescribed for the aircraft.""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return flight.stage in [const.STAGE_DESCENT, const.STAGE_LANDING, const.STAGE_TAXIAFTERLAND] and \ state.reverser and \ state.ias50 and aircraft.hasStrobeLight and \ state.strobeLightsOn is False: self.logSpeedFault(flight, state) else: self._checkPushAndTaxi(flight, aircraft, state) elif flight.stage==const.STAGE_TAXIAFTERLAND: if state.groundSpeed>50: self.logSpeedFault(flight, state) else: self._takeoffState = None def _checkPushAndTaxi(self, flight, aircraft, state): """Check the speed during the push and taxi stage.""" if state.groundSpeed>50: if self._takeoffState is None: self._takeoffState = state self._highestSpeedState = state else: if state.groundSpeed>self._highestSpeedState.groundSpeed: self._highestSpeedState = state if not state.onTheGround: aircraft.setStage(self._takeoffState, const.STAGE_TAKEOFF) self._takeoffState = None elif state.timestamp > (self._takeoffState.timestamp + 60): flight.setRTOState(self._highestSpeedState) elif self._takeoffState is not None: flight.setRTOState(self._highestSpeedState) self._takeoffState = None #--------------------------------------------------------------------------------------- class StrobeLightsChecker(PatientFaultChecker): """Check if the strobe lights are used properly.""" def __init__(self): """Construct the Strobe lights checker.""" super(StrobeLightsChecker, self).__init__(timeout = 5.0) def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" return state.strobeLightsOn is not None and \ ((flight.stage==const.STAGE_BOARDING and \ state.strobeLightsOn and state.onTheGround) or \ (flight.stage==const.STAGE_TAKEOFF and \ not state.strobeLightsOn and not state.gearsDown) or \ (flight.stage in [const.STAGE_CLIMB, const.STAGE_CRUISE, const.STAGE_DESCENT] and \ not state.strobeLightsOn and not state.onTheGround) or \ (flight.stage==const.STAGE_PARKING and \ state.strobeLightsOn and state.onTheGround)) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" message = "Strobe lights were %s" % (("on" if state.strobeLightsOn else "off"),) flight.handleFault(StrobeLightsChecker, state.timestamp, FaultChecker._appendDuring(flight, message), 1) #--------------------------------------------------------------------------------------- class ThrustChecker(SimpleFaultChecker): """Check if the thrust setting is not too high during takeoff. FIXME: is this really so general, for all aircraft?""" def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" if flight.stage==const.STAGE_TAKEOFF and state.n1 is not None: for n1 in state.n1: if n1 is not None and n1>97: return True return False def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" flight.handleFault(ThrustChecker, state.timestamp, FaultChecker._appendDuring(flight, "Thrust setting was too high (>97%)"), FaultChecker._getLinearScore(97, 110, 0, 10, max(state.n1)), updatePrevious = True) #--------------------------------------------------------------------------------------- class VSChecker(SimpleFaultChecker): """Check if the vertical speed is not too low at certain altitudes""" BELOW10000 = -5000 BELOW5000 = -2500 BELOW2500 = -1500 BELOW500 = -1000 TOLERANCE = 1.2 def isCondition(self, flight, aircraft, oldState, state): """Check if the fault condition holds.""" vs = state.smoothedVS altitude = state.altitude return vs < -8000 or vs > 8000 or \ (altitude<500 and vs < (VSChecker.BELOW500 * VSChecker.TOLERANCE)) or \ (altitude<2500 and vs < (VSChecker.BELOW2500 * VSChecker.TOLERANCE)) or \ (altitude<5000 and vs < (VSChecker.BELOW5000 * VSChecker.TOLERANCE)) or \ (altitude<10000 and vs < (VSChecker.BELOW10000 * VSChecker.TOLERANCE)) def logFault(self, flight, aircraft, logger, oldState, state): """Log the fault.""" vs = state.smoothedVS message = "Vertical speed was %.0f feet/min" % (vs,) if vs>-8000 and vs<8000: message += " at %.0f feet (exceeds company limit)" % (state.altitude,) score = 10 if vs<-8000 or vs>8000 else 0 flight.handleFault(VSChecker, state.timestamp, FaultChecker._appendDuring(flight, message), score) #---------------------------------------------------------------------------------------