import requests
import ssl
import logging
from os import environ
from requests.exceptions import ConnectionError
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
from datetime import datetime
from etcd.config import HOST_FAIL_WAIT_S
from etcd.directory_ops import DirectoryOps
from etcd.node_ops import NodeOps
from etcd.server_ops import ServerOps
from etcd.stat_ops import StatOps
from etcd.inorder_ops import InOrderOps
from etcd.modules.lock import LockMod
from etcd.modules.leader import LeaderMod
from etcd.response import ResponseV2
logging.getLogger('requests.packages.urllib3').setLevel(logging.WARN)
_logger = logging.getLogger(__name__)
_SSL_DO_VERIFY = bool(int(environ.get('PEC_SSL_DO_VERIFY', '1')))
_SSL_CA_BUNDLE_FILEPATH = environ.get(
'PEC_SSL_CA_BUNDLE_FILEPATH',
'') or None
_SSL_CLIENT_CRT_FILEPATH = environ.get(
'PEC_SSL_CLIENT_CRT_FILEPATH',
'') or None
_SSL_CLIENT_KEY_FILEPATH = environ.get(
'PEC_SSL_CLIENT_KEY_FILEPATH',
'') or None
class _Ssl3HttpAdapter(HTTPAdapter):
""""Transport adapter" that allows us to use SSLv3."""
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=ssl.PROTOCOL_SSLv3)
class _Modules(object):
"""Intermediate container that holds functionality related to modules.
:param client: Client instance
:type client: :class:`etcd.client.Client`
"""
def __init__(self, client):
self.__client = client
@property
def lock(self):
"""Return an instance of the class having the lock functionality.
:rtype: :class:`etcd.response.ResponseV2`
"""
try:
return self.__lock
except AttributeError:
self.__lock = LockMod(self.__client)
return self.__lock
@property
def leader(self):
"""Return an instance of the class having the leader-election
functionality.
:rtype: :class:`etcd.modules.leader.LeaderMod`
"""
try:
return self.__leader
except AttributeError:
self.__leader = LeaderMod(self.__client)
return self.__leader
[docs]class Client(object):
"""The main channel of functionality for the client. Connects to the
server, and provides functions via properties.
:param host: Hostname or IP of server
:type host: string
:param port: Port of server
:type port: int
:param is_ssl: Whether to use 'http://' or 'https://'.
:type is_ssl: bool
:param ssl_do_verify: Whether to verify the certificate hostname.
:type ssl_do_verify: bool or None
:param ssl_ca_bundle_filepath: A bundle of rootCAs for verifications.
:type ssl_ca_bundle_filepath: string or None
:param ssl_client_cert_filepath: A client certificate, for authentication.
:type ssl_client_cert_filepath: string or None
:param ssl_client_key_filepath: A client key, for authentication.
:type ssl_client_key_filepath: string or None
:raises: ValueError
"""
def __init__(self, host='127.0.0.1', port=4001,
is_ssl=False, ssl_do_verify=_SSL_DO_VERIFY,
ssl_ca_bundle_filepath=_SSL_CA_BUNDLE_FILEPATH,
ssl_client_cert_filepath=_SSL_CLIENT_CRT_FILEPATH,
ssl_client_key_filepath=_SSL_CLIENT_KEY_FILEPATH):
if ssl_do_verify is not None:
_logger.debug("SSL: Explicit verify setting given: [%s]", ssl_do_verify)
self.__ssl_verify = ssl_do_verify
elif ssl_ca_bundle_filepath is not None:
_logger.debug("SSL: We'll be verifying against a CA bundle: [%s]",
ssl_ca_bundle_filepath)
self.__ssl_verify = ssl_ca_bundle_filepath
else:
_logger.debug("SSL: We'll verify the CA certificate, by default.")
self.__ssl_verify = True
if ssl_client_cert_filepath is None:
_logger.debug("SSL: No client key/certificate will be used.")
self.__ssl_cert = None
elif ssl_client_key_filepath is not None:
_logger.debug("SSL: Client key and certificate will be used: "
"KEY=[%s] CERTIFICATE=[%s]",
ssl_client_key_filepath, ssl_client_cert_filepath)
self.__ssl_cert = \
(ssl_client_cert_filepath,
ssl_client_key_filepath)
else:
_logger.debug("SSL: Client certificate will be used (without a "
"key): [%s]", ssl_client_cert_filepath)
self.__ssl_cert = ssl_client_cert_filepath
scheme = 'http' if is_ssl is False else 'https'
self.__prefix = ('%s://%s:%s' % (scheme, host, port))
_logger.debug("PREFIX= [%s]", self.__prefix)
self.__session = requests.Session()
# Define an adapter for when SSL is requested.
self.__session.mount('https://', _Ssl3HttpAdapter())
# TODO: Remove the version check after debugging.
# TODO: Can we implicitly read the version from the response/headers?
# self.__version = self.server.get_version()
# self.debug("Version: %s" % (self.__version))
#
# if self.__version.startswith('0.2') is False:
# raise ValueError("We don't support an etcd version older than 0.2.0 .")
self.__machines = [[dict(machine_info)['etcd'], None]
for machine_info
in self.server.get_machines()]
_logger.debug("Cluster machines: %s", self.__machines)
# This will fail if the given server does appear in the published list
# of servers. This might only happen because of a hostname being used
# instead of an IP, or vice-versa.
self.__machine_index = None
i = 0
for (prefix, last_fail_dt) in self.__machines:
if prefix == self.__prefix:
self.__machine_index = i
break
i += 1
if self.__machine_index is None:
raise ValueError("Could not identify given prefix [%s] among "
"published prefixes: %s" %
(self.__prefix, self.__machines))
_logger.debug("The initial machine is at index (%d).",
self.__machine_index)
def __str__(self):
return ('<ETCD %s>' % (self.__prefix))
[docs] def send(self, version, verb, path, value=None, parameters=None, data=None,
module=None, return_raw=False, allow_reconnect=True):
"""Build and execute a request.
:param version: Version of API
:type version: int
:param verb: Verb of request ('get', 'post', etc..)
:type verb: string
:param path: URL path
:type path: string
:param value: Value to be converted to string and passed as "value" in
the POST data.
:type value: scalar or None
:param parameters: Dictionary of values to be passed via URL query.
:type parameters: dictionary or None
:param data: Dictionary of values to be passed via POST data.
:type data: dictionary or None
:param module: Name of the etcd module that hosts the functionality.
:type module: string or None
:param return_raw: Whether to return a
:class:`etcd.response.ResponseV2` object or the raw
Requests response.
:type return_raw: bool
:param allow_reconnect: Allow the client to consider alternate hosts if
the current host fails connection.
:type allow_reconnect: bool
:returns: Response object
:rtype: :class:`etcd.response.ResponseV2`
"""
if parameters is None:
parameters = {}
if data is None:
data = {}
if version != 2:
raise ValueError("We were told to send a version (%d) request, "
"which is not supported." % (version))
else:
response_cls = ResponseV2
if module is None:
url = ('%s/v%d%s' % (self.__prefix, version, path))
else:
url = ('%s/mod/v%d/%s%s' % (self.__prefix, version, module, path))
if value is not None:
data['value'] = value
args = { 'params': parameters,
'data': data,
'verify': self.__ssl_verify,
'cert': self.__ssl_cert }
_logger.debug("Request(%s)=[%s] params=[%s] data_keys=[%s]",
verb, url, parameters, args['data'].keys())
send = getattr(self.__session, verb)
while 1:
try:
r = send(url, **args)
except ConnectionError as e:
_logger.debug("Connection error with [%s] [%s]: %s",
self.__prefix, e.__class__.__name__, str(e))
if allow_reconnect is False:
raise
else:
break
# If we get here, there was a connection problem. Rotate the server
# that we're using, excluding any that have recently failed.
now_dt = datetime.now()
self.__machines[self.__machine_index][1] = now_dt
len_ = len(self.__machines)
i = 0
elected = None
while i < len_:
machine_index = (self.__machine_index + 1) % len_
(prefix, last_fail_dt) = self.__machines[machine_index]
if last_fail_dt is None or \
(now_dt - last_fail_dt).total_seconds() > \
HOST_FAIL_WAIT_S:
elected = prefix
i += 1
if elected is None:
raise SystemError("All servers have failed: %s" %
(self.__machines,))
self.__prefix = elected
self.__machine_index = machine_index
_logger.debug("Retrying with next machine: %s", self.__prefix)
r.raise_for_status()
if return_raw is True:
return r
return response_cls(r, verb, path)
@property
[docs] def session(self):
return self.__session
@property
[docs] def prefix(self):
"""Return the URL prefix for the server.
:rtype: string
"""
return self.__prefix
@property
[docs] def directory(self):
"""Return an instance of the class having the directory functionality.
:rtype: :class:`etcd.directory_ops.DirectoryOps`
"""
try:
return self.__directory
except AttributeError:
self.__directory = DirectoryOps(self)
return self.__directory
@property
[docs] def node(self):
"""Return an instance of the class having the general node
functionality.
:rtype: :class:`etcd.node_ops.NodeOps`
"""
try:
return self.__node
except AttributeError:
self.__node = NodeOps(self)
return self.__node
@property
[docs] def server(self):
"""Return an instance of the class having the server functionality.
:rtype: :class:`etcd.server_ops.ServerOps`
"""
try:
return self.__server
except AttributeError:
self.__server = ServerOps(self)
return self.__server
@property
[docs] def stat(self):
"""Return an instance of the class having the stat functionality.
:rtype: :class:`etcd.stat_ops.StatOps`
"""
try:
return self.__stat
except AttributeError:
self.__stat = StatOps(self)
return self.__stat
@property
[docs] def inorder(self):
"""Return an instance of the class having the "in-order keys"
functionality.
:rtype: :class:`etcd.inorder_ops.InOrderOps`
"""
try:
return self.__inorder
except AttributeError:
self.__inorder = InOrderOps(self)
return self.__inorder
@property
[docs] def module(self):
"""Return an instance of the class that hosts the functionality
provided by individual modules.
:rtype: :class:`etcd.client._Modules`
"""
try:
return self.__module
except AttributeError:
self.__module = _Modules(self)
return self.__module