由于该文件大于8万字符 所以我分4次挂载(1)
该文件位于nova/virt/libvirt目录下的connection.py!我只是浅浅的分析了一下类中函数的方法 细节并没有多看,肯定有很多地方是错的 或者不好!希望大家能够帮忙指出错误!
接下来 看源代码如下:中文部分是我加的注释 !或许大家会问 为什么要看这个connection.py呢 因为我发现该文件外部virt目录下有个connection.py 其中引用了 这个文件 所以觉得这个应该很重要 而且发现 好多方法都是重写的底层的driver的方法
- import hashlib
- import functools
- import multiprocessing
- import netaddr
- import os
- import random
- import re
- import shutil
- import sys
- import tempfile
- import time
- import uuid
- from xml.dom import minidom
- from xml.etree import ElementTree
- from eventlet import greenthread
- from eventlet import tpool
- from nova import block_device
- from nova import context as nova_context
- from nova import db
- from nova import exception
- from nova import flags
- import nova.p_w_picpath
- from nova import log as logging
- from nova import utils
- from nova import vnc
- from nova.auth import manager
- from nova.compute import instance_types
- from nova.compute import power_state
- from nova.virt import disk
- from nova.virt import driver
- from nova.virt import p_w_picpaths
- from nova.virt.libvirt import netutils
- libvirt = None
- libxml2 = None
- Template = None
- LOG = logging.getLogger('nova.virt.libvirt_conn')
- FLAGS = flags.FLAGS
- flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
- # TODO(vish): These flags should probably go into a shared location
- #这些标志应该进入共享位置
- flags.DEFINE_string('rescue_p_w_picpath_id', None, 'Rescue ami p_w_picpath')
- flags.DEFINE_string('rescue_kernel_id', None, 'Rescue aki p_w_picpath')
- flags.DEFINE_string('rescue_ramdisk_id', None, 'Rescue ari p_w_picpath')
- flags.DEFINE_string('libvirt_xml_template',
- utils.abspath('virt/libvirt.xml.template'),
- 'Libvirt XML Template')
- flags.DEFINE_string('libvirt_type',
- 'kvm',
- 'Libvirt domain type (valid options are: '
- 'kvm, lxc, qemu, uml, xen)')
- flags.DEFINE_string('libvirt_uri',
- '',
- 'Override the default libvirt URI (which is dependent'
- ' on libvirt_type)')
- flags.DEFINE_bool('allow_same_net_traffic',
- True,
- 'Whether to allow network traffic from same network')
- flags.DEFINE_bool('use_cow_p_w_picpaths',
- True,
- 'Whether to use cow p_w_picpaths')
- flags.DEFINE_string('ajaxterm_portrange',
- '10000-12000',
- 'Range of ports that ajaxterm should randomly try to bind')
- flags.DEFINE_string('firewall_driver',
- 'nova.virt.libvirt.firewall.IptablesFirewallDriver',
- 'Firewall driver (defaults to iptables)')
- flags.DEFINE_string('cpuinfo_xml_template',
- utils.abspath('virt/cpuinfo.xml.template'),
- 'CpuInfo XML Template (Used only live migration now)')
- flags.DEFINE_string('live_migration_uri',
- "qemu+tcp://%s/system",
- 'Define protocol used by live_migration feature')
- flags.DEFINE_string('live_migration_flag',
- "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER",
- 'Define live migration behavior.')
- flags.DEFINE_string('block_migration_flag',
- "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER, "
- "VIR_MIGRATE_NON_SHARED_INC",
- 'Define block migration behavior.')
- flags.DEFINE_integer('live_migration_bandwidth', 0,
- 'Define live migration behavior')
- flags.DEFINE_string('snapshot_p_w_picpath_format', None,
- 'Snapshot p_w_picpath format (valid options are : '
- 'raw, qcow2, vmdk, vdi).'
- 'Defaults to same as source p_w_picpath')
- flags.DEFINE_string('libvirt_vif_type', 'bridge',
- 'Type of VIF to create.')
- flags.DEFINE_string('libvirt_vif_driver',
- 'nova.virt.libvirt.vif.LibvirtBridgeDriver',
- 'The libvirt VIF driver to configure the VIFs.')
- flags.DEFINE_string('default_local_format',
- None,
- 'The default format a local_volume will be formatted with '
- 'on creation.')
- flags.DEFINE_bool('libvirt_use_virtio_for_bridges',
- False,
- 'Use virtio for bridge interfaces')
- #get_connection 获得与hypervisor(管理程序)的连接
- def get_connection(read_only):
- # These are loaded late so that there's no need to install these
- # libraries when not using libvirt.
- # Cheetah is separate because the unit tests want to load Cheetah,
- # but not libvirt.
- global libvirt
- global libxml2
- if libvirt is None:
- libvirt = __import__('libvirt')
- if libxml2 is None:
- libxml2 = __import__('libxml2')
- _late_load_cheetah() #实际上是延迟引入模板
- return LibvirtConnection(read_only)
- def _late_load_cheetah():
- global Template
- if Template is None:
- t = __import__('Cheetah.Template', globals(), locals(),
- ['Template'], -1)
- Template = t.Template
- def _get_eph_disk(ephemeral):
- return 'disk.eph' + str(ephemeral['num'])
- class LibvirtConnection(driver.ComputeDriver):
- #这个类 LibvirtConnection是继承的drive.ComputerDriver
- #为了让大家一目了然 后面会插入一个 LibvirtConnection 的类图
- def __init__(self, read_only):
- super(LibvirtConnection, self).__init__() #父类的初始化
- self.libvirt_uri = self.get_uri()
- #2、获得链接
- """
- self.libvirt_uri = self.get_uri()
- get_uri()函数如下定义。
- def get_uri(self):
- if FLAGS.libvirt_type == 'uml':
- uri = FLAGS.libvirt_uri or 'uml:///system'
- elif FLAGS.libvirt_type == 'xen':
- uri = FLAGS.libvirt_uri or 'xen:///'
- elif FLAGS.libvirt_type == 'lxc':
- uri = FLAGS.libvirt_uri or 'lxc:///'
- else:
- uri = FLAGS.libvirt_uri or 'qemu:///system'
- return uri
- """
- #设置模板
- self.libvirt_xml = open(FLAGS.libvirt_xml_template).read()
- self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read()
- self._wrapped_conn = None
- self.read_only = read_only
- #设置firewall
- fw_class = utils.import_class(FLAGS.firewall_driver) #获得类名
- self.firewall_driver = fw_class(get_connection=self._get_connection)
- #获得链接调用—get_connection
- """函数如下
- def _get_connection(self):
- if not self._wrapped_conn or not self._test_connection():
- LOG.debug(_('Connecting to libvirt: %s'), self.libvirt_uri)
- self._wrapped_conn = self._connect(self.libvirt_uri,
- self.read_only)
- #此处是_connect函数 这个函数是从底层libvirt库中拿到链接
- #def _connect(self, uri, read_only): auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],'root',None]
- # if read_only:
- # return libvirt.openReadOnly(uri)
- # else:
- # return libvirt.openAuth(uri, auth, 0)
- return self._wrapped_conn
- """
- self.vif_driver = utils.import_object(FLAGS.libvirt_vif_driver)
- def init_host(self, host):
- # NOTE(nsokolov): moved instance restarting to ComputeManager
- pass
- def _get_connection(self):
- #获得链接
- if not self._wrapped_conn or not self._test_connection():
- LOG.debug(_('Connecting to libvirt: %s'), self.libvirt_uri)
- self._wrapped_conn = self._connect(self.libvirt_uri,
- self.read_only)
- return self._wrapped_conn
- _conn = property(_get_connection)
- def _test_connection(self):
- #测试链接
- try:
- self._wrapped_conn.getCapabilities()
- return True
- except libvirt.libvirtError as e:
- if e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and \
- e.get_error_domain() == libvirt.VIR_FROM_REMOTE:
- LOG.debug(_('Connection to libvirt broke'))
- return False
- raise
- def get_uri(self):
- #获得url 提供被调用
- if FLAGS.libvirt_type == 'uml':
- uri = FLAGS.libvirt_uri or 'uml:///system'
- elif FLAGS.libvirt_type == 'xen':
- uri = FLAGS.libvirt_uri or 'xen:///'
- elif FLAGS.libvirt_type == 'lxc':
- uri = FLAGS.libvirt_uri or 'lxc:///'
- else:
- uri = FLAGS.libvirt_uri or 'qemu:///system'
- return uri
- def _connect(self, uri, read_only):
- #链接
- auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],
- 'root',
- None]
- if read_only:
- return libvirt.openReadOnly(uri)
- else:
- return libvirt.openAuth(uri, auth, 0)
- def list_instances(self):
- return [self._conn.lookupByID(x).name()
- for x in self._conn.listDomainsID()]
- def _map_to_instance_info(self, domain):
- #从一个virsh(虚拟)域对象到一个 InstanceInfO得到信息
- """Gets info from a virsh domain object into an InstanceInfo"""
- # domain.info() returns a list of:
- # state: one of the state values (virDomainState)
- # maxMemory: the maximum memory used by the domain
- # memory: the current amount of memory used by the domain
- # nbVirtCPU: the number of virtual CPU
- # puTime: the time used by the domain in nanoseconds
- (state, _max_mem, _mem, _num_cpu, _cpu_time) = domain.info()
- name = domain.name()
- return driver.InstanceInfo(name, state)
- def list_instances_detail(self):
- #返回一个信息列表
- infos = []
- for domain_id in self._conn.listDomainsID():
- domain = self._conn.lookupByID(domain_id)
- info = self._map_to_instance_info(domain)
- infos.append(info)
- return infos
- def plug_vifs(self, instance, network_info):
- #接通VIFs(虚拟接口) 的网络
- """Plugin VIFs into networks."""
- for (network, mapping) in network_info:
- self.vif_driver.plug(instance, network, mapping)
- def destroy(self, instance, network_info, cleanup=True):
- #Destroy (shutdown and delete) the specified instance.
- instance_name = instance['name']
- try:
- virt_dom = self._lookup_by_name(instance_name)
- except exception.NotFound:
- virt_dom = None
- # If the instance is already terminated, we're still happy
- # Otherwise, destroy it
- if virt_dom is not None:
- try:
- virt_dom.destroy()
- except libvirt.libvirtError as e:
- is_okay = False
- errcode = e.get_error_code()
- if errcode == libvirt.VIR_ERR_OPERATION_INVALID:
- # If the instance if already shut off, we get this:
- # Code=55 Error=Requested operation is not valid:
- # domain is not running
- (state, _max_mem, _mem, _cpus, _t) = virt_dom.info()
- if state == power_state.SHUTOFF:
- is_okay = True
- if not is_okay:
- LOG.warning(_("Error from libvirt during destroy of "
- "%(instance_name)s. Code=%(errcode)s "
- "Error=%(e)s") %
- locals())
- raise
- try:
- # NOTE(justinsb): We remove the domain definition. We probably
- # would do better to keep it if cleanup=False (e.g. volumes?)
- # (e.g. #2 - not losing machines on failure)
- virt_dom.undefine()
- except libvirt.libvirtError as e:
- errcode = e.get_error_code()
- LOG.warning(_("Error from libvirt during undefine of "
- "%(instance_name)s. Code=%(errcode)s "
- "Error=%(e)s") %
- locals())
- raise
- for (network, mapping) in network_info:
- self.vif_driver.unplug(instance, network, mapping)
- def _wait_for_destroy():
- """Called at an interval until the VM is gone."""
- instance_name = instance['name']
- try:
- state = self.get_info(instance_name)['state']
- except exception.NotFound:
- msg = _("Instance %s destroyed successfully.") % instance_name
- LOG.info(msg)
- raise utils.LoopingCallDone
- timer = utils.LoopingCall(_wait_for_destroy)
- timer.start(interval=0.5, now=True)
- self.firewall_driver.unfilter_instance(instance,
- network_info=network_info)
- if cleanup:
- self._cleanup(instance)
- return True
- def _cleanup(self, instance):
- target = os.path.join(FLAGS.instances_path, instance['name'])
- instance_name = instance['name']
- LOG.info(_('instance %(instance_name)s: deleting instance files'
- ' %(target)s') % locals())
- if FLAGS.libvirt_type == 'lxc':
- disk.destroy_container(target, instance, nbd=FLAGS.use_cow_p_w_picpaths)
- if os.path.exists(target):
- shutil.rmtree(target)
- @exception.wrap_exception()
- def attach_volume(self, instance_name, device_path, mountpoint):
- #在挂载点设置实例磁盘的路径
- virt_dom = self._lookup_by_name(instance_name)
- mount_device = mountpoint.rpartition("/")[2]
- (type, protocol, name) = \
- self._get_volume_device_info(device_path)
- if type == 'block':
- xml = """<disk type='block'>
- <driver name='qemu' type='raw'/>
- <source dev='%s'/>
- <target dev='%s' bus='virtio'/>
- </disk>""" % (device_path, mount_device)
- elif type == 'network':
- xml = """<disk type='network'>
- <driver name='qemu' type='raw'/>
- <source protocol='%s' name='%s'/>
- <target dev='%s' bus='virtio'/>
- </disk>""" % (protocol, name, mount_device)
- virt_dom.attachDevice(xml)
- def _get_disk_xml(self, xml, device):
- """Returns the xml for the disk mounted at device"""
- #返回磁盘在设备上磁盘安装的xml
- try:
- doc = libxml2.parseDoc(xml)
- except Exception:
- return None
- ctx = doc.xpathNewContext()
- try:
- ret = ctx.xpathEval('/domain/devices/disk')
- for node in ret:
- for child in node.children:
- if child.name == 'target':
- if child.prop('dev') == device:
- return str(node)
- finally:
- if ctx is not None:
- ctx.xpathFreeContext()
- if doc is not None:
- doc.freeDoc()
- @exception.wrap_exception()
- def detach_volume(self, instance_name, mountpoint):
- virt_dom = self._lookup_by_name(instance_name)
- mount_device = mountpoint.rpartition("/")[2]
- xml = self._get_disk_xml(virt_dom.XMLDesc(0), mount_device)
- if not xml:
- raise exception.DiskNotFound(location=mount_device)
- virt_dom.detachDevice(xml)
- @exception.wrap_exception()
- def snapshot(self, context, instance, p_w_picpath_href):
- """Create snapshot from a running VM instance.
- This command only works with qemu 0.14+
- """
- virt_dom = self._lookup_by_name(instance['name'])
- (p_w_picpath_service, p_w_picpath_id) = nova.p_w_picpath.get_p_w_picpath_service(
- context, instance['p_w_picpath_ref'])
- base = p_w_picpath_service.show(context, p_w_picpath_id)
- (snapshot_p_w_picpath_service, snapshot_p_w_picpath_id) = \
- nova.p_w_picpath.get_p_w_picpath_service(context, p_w_picpath_href)
- snapshot = snapshot_p_w_picpath_service.show(context, snapshot_p_w_picpath_id)
- metadata = { 'is_public': False,
- 'status': 'active',
- 'name': snapshot['name'],
- 'properties': {
- 'kernel_id': instance['kernel_id'],
- 'p_w_picpath_location': 'snapshot',
- 'p_w_picpath_state': 'available',
- 'owner_id': instance['project_id'],
- 'ramdisk_id': instance['ramdisk_id'],
- }
- }
- if 'architecture' in base['properties']:
- arch = base['properties']['architecture']
- metadata['properties']['architecture'] = arch
- source_format = base.get('disk_format') or 'raw'
- p_w_picpath_format = FLAGS.snapshot_p_w_picpath_format or source_format
- if FLAGS.use_cow_p_w_picpaths:
- source_format = 'qcow2'
- metadata['disk_format'] = p_w_picpath_format
- if 'container_format' in base:
- metadata['container_format'] = base['container_format']
- # Make the snapshot
- snapshot_name = uuid.uuid4().hex
- snapshot_xml = """
- <domainsnapshot>
- <name>%s</name>
- </domainsnapshot>
- """ % snapshot_name
- snapshot_ptr = virt_dom.snapshotCreateXML(snapshot_xml, 0)
- # Find the disk
- xml_desc = virt_dom.XMLDesc(0)
- domain = ElementTree.fromstring(xml_desc)
- source = domain.find('devices/disk/source')
- disk_path = source.get('file')
- # Export the snapshot to a raw p_w_picpath
- temp_dir = tempfile.mkdtemp()
- out_path = os.path.join(temp_dir, snapshot_name)
- qemu_img_cmd = ('qemu-img',
- 'convert',
- '-f',
- source_format,
- '-O',
- p_w_picpath_format,
- '-s',
- snapshot_name,
- disk_path,
- out_path)
- utils.execute(*qemu_img_cmd)
- # Upload that p_w_picpath to the p_w_picpath service
- with open(out_path) as p_w_picpath_file:
- p_w_picpath_service.update(context,
- p_w_picpath_href,
- metadata,
- p_w_picpath_file)
- # Clean up
- shutil.rmtree(temp_dir)
- snapshot_ptr.delete(0)
- @exception.wrap_exception()
- def reboot(self, instance, network_info, xml=None):
- """Reboot a virtual machine, given an instance reference.
- This method actually destroys and re-creates the domain to ensure the
- reboot happens, as the guest OS cannot ignore this action.
- """
- virt_dom = self._conn.lookupByName(instance['name'])
- # NOTE(itoumsn): Use XML delived from the running instance
- # instead of using to_xml(instance, network_info). This is almost
- # the ultimate stupid workaround.
- if not xml:
- xml = virt_dom.XMLDesc(0)
- # NOTE(itoumsn): self.shutdown() and wait instead of self.destroy() is
- # better because we cannot ensure flushing dirty buffers
- # in the guest OS. But, in case of KVM, shutdown() does not work...
- self.destroy(instance, network_info, cleanup=False)
- self.plug_vifs(instance, network_info)
- self.firewall_driver.setup_basic_filtering(instance, network_info)
- self.firewall_driver.prepare_instance_filter(instance, network_info)
- self._create_new_domain(xml)
- self.firewall_driver.apply_instance_filter(instance, network_info)
- def _wait_for_reboot():
- """Called at an interval until the VM is running again."""
- instance_name = instance['name']
- try:
- state = self.get_info(instance_name)['state']
- except exception.NotFound:
- msg = _("During reboot, %s disappeared.") % instance_name
- LOG.error(msg)
- raise utils.LoopingCallDone
- if state == power_state.RUNNING:
- msg = _("Instance %s rebooted successfully.") % instance_name
- LOG.info(msg)
- raise utils.LoopingCallDone
- timer = utils.LoopingCall(_wait_for_reboot)
- return timer.start(interval=0.5, now=True)
- @exception.wrap_exception()
- def pause(self, instance, callback):
- """Pause VM instance"""
- dom = self._lookup_by_name(instance.name)
- dom.suspend()
- @exception.wrap_exception()
- def unpause(self, instance, callback):
- """Unpause paused VM instance"""
- dom = self._lookup_by_name(instance.name)
- dom.resume()
- @exception.wrap_exception()
- def suspend(self, instance, callback):
- """Suspend the specified instance"""
- dom = self._lookup_by_name(instance.name)
- dom.managedSave(0)
- @exception.wrap_exception()
- def resume(self, instance, callback):
- """resume the specified instance"""
- dom = self._lookup_by_name(instance.name)
- dom.create()
- @exception.wrap_exception()