Source code for pyardrone.at

from pyardrone.utils import bits, logging
from pyardrone.at.base import ATCommand
from pyardrone.at import parameters
from pyardrone.abc import BaseClient

import threading
import socket


logger = logging.getLogger(__name__)


__all__ = (
    'REF', 'PCMD', 'PCMD_MAG', 'FTRIM', 'CONFIG',
    'CONFIG_IDS', 'COMWDG', 'CALIB', 'CTRL'
)


class REF(ATCommand):
    '''
    at.REF interface of version 0.5
    '''

    input = parameters.Int32(
        'an integer value, '
        'representing a 32 bit-wide bit-field controlling the drone')

    input._set_flags(
        default=bits(18, 20, 22, 24, 28),  # Always on
        start=bits(9),  # Takeoff / Land
        select=bits(8),  # Switch of emergency mode
    )


REF_0_5 = REF


[docs]class REF(REF_0_5): ''' Controls the basic behaviour of the drone (take-off/landing, emergency stop/reset) ''' _parameters = REF_0_5._parameters def __new__(cls, input=0, *, use_default_bits=True): if int.bit_length(input) > 32: raise ValueError( 'value input {} should be less than 4 bytes'.format(input)) if use_default_bits: input |= cls.input.default return super().__new__(cls, input)
[docs]class PCMD(ATCommand): ''' Send progressive commands - makes the drone move (translate/rotate). ''' flag = parameters.Int32( 'flag enabling the use of progressive commands and/or the Combined' 'Yaw mode (bitfield)') roll = parameters.Float('drone left-right tilt, [-1...1]', default=0) pitch = parameters.Float('drone front-back tilt, [-1...1]', default=0) gaz = parameters.Float('drone vertical speed, [-1...1]', default=0) yaw = parameters.Float('drone angular speed, [-1...1]', default=0) flag._set_flags( absolute_control=bits(2), combined_yaw=bits(1), progressive=bits(0), )
[docs]class PCMD_MAG(ATCommand): ''' Send progressive commands - makes the drone move (translate/rotate). ''' flag = parameters.Int32( 'flag enabling the use of progressive commands and/or the Combined' 'Yaw mode (bitfield)') roll = parameters.Float('drone left-right tilt, [-1...1]') pitch = parameters.Float('drone front-back tilt, [-1...1]') gaz = parameters.Float('drone vertical speed, [-1...1]') yaw = parameters.Float('drone angular speed, [-1...1]') psi = parameters.Float('magneto psi, [-1...1]') psi_accuracy = parameters.Float('magneto psi accuracy, [-1...1]')
[docs]class FTRIM(ATCommand): ''' Flat trims - Tells the drone it is lying horizontally '''
[docs]class CONFIG(ATCommand): ''' Sets an configurable option on the drone ''' key = parameters.String('the name of the option to set') value = parameters.String('the option value')
[docs]class CONFIG_IDS(ATCommand): ''' Identifiers for the next AT*CONFIG command ''' session = parameters.String() user = parameters.String() application_ids = parameters.String()
[docs]class COMWDG(ATCommand): ''' reset communication watchdog '''
[docs]class CALIB(ATCommand): ''' Magnetometer calibration - Tells the drone to calibrate its magnetometer ''' device_number = parameters.Int32( 'Identifier of the device to calibrate - ' 'Choose this identifier from ardrone_calibration_device_t.')
[docs]class CTRL(ATCommand): ''' Not documented in developer guide, change control mode ''' mode = parameters.Int32() mode._set_flags( NO_CONTROL_MODE=0, # Doing nothing ARDRONE_UPDATE_CONTROL_MODE=1, # Not used PIC_UPDATE_CONTROL_MODE=2, # Not used LOGS_GET_CONTROL_MODE=3, # Not used CFG_GET_CONTROL_MODE=4, # Send active configuration file to a client through the # 'control' socket UDP 5559 */ ACK_CONTROL_MODE=5, # Reset command mask in navdata CUSTOM_CFG_GET_CONTROL_MODE=6, # Requests the list of custom configuration IDs ) zero = parameters.Int32(default=0)
class ATClient(BaseClient): connected = False def __init__( self, host='192.168.1.1', port=5556, watchdog_interval=0.5, log_comwdg=False ): self.host = host self.port = port self.watchdog_interval = watchdog_interval self.log_comwdg = log_comwdg self._closed = threading.Event() @property def closed(self): return self._closed.is_set() @closed.setter def closed(self, boolean): if boolean: self._closed.set() else: self._closed.clear() def _connect(self): self.sequence_number = 0 self.sequence_number_mutex = threading.Lock() self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._thread = threading.Thread(target=self._watchdog_job) self._thread.start() def _close(self): self._thread.join() self.sock.close() def send_bytes(self, bytez, *, log=True): self.sock.sendto(bytez, (self.host, self.port)) if log: logger.debug('sent: {!r}', bytez) def send(self, command, *, log=True): ''' :param pyardrone.at.base.ATCommand command: command to send Sends the command to the drone, with an internal increasing sequence number. this method is thread-safe. ''' with self.sequence_number_mutex: self.sequence_number += 1 packed = command._pack(self.sequence_number) self.send_bytes(packed, log=log) def _watchdog_job(self): while not self.closed: self.send(COMWDG(), log=self.log_comwdg) self._closed.wait(timeout=self.watchdog_interval)