学习和研究中前行,并在分享中提升自己

欢迎订阅阿里内推邮件



kvm virtio net 前后端通信分析

阅读次数: 1241| 时间:2017年12月10日 16:32 | 标签:virtio_net

kvm virtio net 源码分析

前言

最近遇到两例子网络不通的问题。虽然现像不一样,但是根本原因都如果出一则:都是前端tx queue处于stopped状态,导致无法发包,且后面再也无法awake 这个queue.

具体代码

分析中使用了3.10人kernel,具体代码在driver/net/virtio_net.c.先看一下具体vm 往外发包的函数

static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)      
{                                                                               
    struct virtnet_info *vi = netdev_priv(dev);                                 
    int qnum = skb_get_queue_mapping(skb);                                      
    struct send_queue *sq = &vi->sq[qnum];                                      
    int err;                                                                    
    struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum);                  
    bool kick = !skb->xmit_more;                                                
    bool use_napi = sq->napi.weight;                                            

    /* Free up any pending old buffers before queueing new ones. */             
    free_old_xmit_skbs(sq);                                                     

    if (use_napi && kick)                                                       
        virtqueue_enable_cb_delayed(sq->vq);                                    

    /* timestamp packet in software */                                          
    skb_tx_timestamp(skb);                                                      

    /* Try to transmit */                                                       
    err = xmit_skb(sq, skb);                                                    

    /* This should not happen! */                                               
    if (unlikely(err)) {                                                        
        dev->stats.tx_fifo_errors++;                                            
        if (net_ratelimit())                                                    
            dev_warn(&dev->dev,                                                 
                 "Unexpected TXQ (%d) queue failure: %d\n", qnum, err);         
        dev->stats.tx_dropped++;                                                
        dev_kfree_skb_any(skb);                                                 
        return NETDEV_TX_OK;                                                    
    }                                                                           

    /* Don't wait up for transmitted skbs to be freed. */                       
    if (!use_napi) {                                                            
        skb_orphan(skb);                                                        
        nf_reset(skb);                                                          
    }                                                                           

    /* If running out of space, stop queue to avoid getting packets that we     
     * are then unable to transmit.      
    if (sq->vq->num_free < 2+MAX_SKB_FRAGS) {                                  
        netif_stop_subqueue(dev, qnum);                                        
        if (!use_napi &&                                                       
            unlikely(!virtqueue_enable_cb_delayed(sq->vq))) {                  
            /* More just got used, free them then recheck. */                  
            free_old_xmit_skbs(sq);                                            
            if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) {                         
                netif_start_subqueue(dev, qnum);                               
                virtqueue_disable_cb(sq->vq);                                  
            }                                                                  
        }                                                                      
    }                                                                          

    if (kick || netif_xmit_stopped(txq))                                       
        virtqueue_kick(sq->vq);                                                

    return NETDEV_TX_OK;                                                       
}                                                  

这个就是virtio net前端驱动主要的往外发包的函数,那么你也许会问系统会在什么时候调用这个函数呢?具体流程如下

dev_queue_xmit->__dev_queue_xmit->dev_hard_start_xmit->xmit_one->netdev_start_xmit->__netdev_start_xmit->ops->ndo_start_xmit

而对virtio_net来说最终调用的就是start_xmit这个函数。至此,我们已经把整个发包流程梳理清楚了。那么下面来说一下什么情况会发不出去网络包。

前端什么情况下不会再发包出去

当vm里面流量比较的时候,很容易把virtio net的vring buf 给打满了,这个时候内核会就把这个queue 置为stop状态。具体先看代码

//   当num_free小于18的时候就会先stop queue.
    if (sq->vq->num_free < 2+MAX_SKB_FRAGS) {                                  
        netif_stop_subqueue(dev, qnum);                                        
        if (!use_napi &&                                                       
            unlikely(!virtqueue_enable_cb_delayed(sq->vq))) {                  
            /* More just got used, free them then recheck. */                  
            free_old_xmit_skbs(sq);                                            
            if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) {                         
                netif_start_subqueue(dev, qnum);                               
                virtqueue_disable_cb(sq->vq);                                  
            }                                                                  
        }                                                                      
    }                

stop queue其实就是给这个queue置了状态

static __always_inline void netif_tx_stop_queue(struct netdev_queue *dev_queue)
{                                                                              
    set_bit(__QUEUE_STATE_DRV_XOFF, &dev_queue->state);
}

这样当系统发现这个queue为stop状态,那么从这个queue发出去的包就直接被drop了。userspace的表现就是丢包

唤醒stop的queue

读到这里,你也许会问那既然stop了肯定要在某个时候把它awake,否则这个vm就永远发不出去包了。对的,当然会awake,这个就涉及到前后端交互的问题,既然这样,我们先从前端相关数据结构讲起。先看一下send_queue

struct send_queue {
    /* Virtqueue associated with this send _queue */                
    struct virtqueue *vq;                                                     

    /* TX: fragments + linear part + virtio header */                         
    struct scatterlist sg[MAX_SKB_FRAGS + 2];                                 

    /* Name of the send queue: output.$index */                               
    char name[40];                                                            

    struct napi_struct napi;                  
};       

可以看到每个send_queue都包含一个virtqueue,接下来我们再看看virtqueue数据结构

struct virtqueue {                                                            
    struct list_head list;                                                    
    void (*callback)(struct virtqueue *vq);                                   
    const char *name;                                                         
    struct virtio_device *vdev;                                               
    unsigned int index;                                                       
    unsigned int num_free;                                                    
    void *priv;                                                               
}; 

其中num_free表示这个queue上vring上buffer数目。那么每个virtqueue是怎么跟vring_virtqueue对应起来的呢。先看一下vring_virtqueue数据结构

struct vring_virtqueue {                                                        
    struct virtqueue vq;                                                        

    /* Actual memory layout for this queue */                                   
    struct vring vring;                                                         

    /* Can we use weak barriers? */                                             
    bool weak_barriers;                                                         

    /* Other side has made a mess, don't try any more. */                       
    bool broken;                                                                

    /* Host supports indirect buffers */                                        
    bool indirect;                                                              

    /* Host publishes avail event idx */                                        
    bool event;                                                                 

    /* Head of free buffer list. */                                             
    unsigned int free_head;                                                     
    /* Number we've added since last sync. */                                   
    unsigned int num_added;                                                     

    /* Last used index we've seen. */                                           
    u16 last_used_idx;                                                          

    /* Last written value to avail->flags */                                    
    u16 avail_flags_shadow;                                                     

    /* Last written value to avail->idx in guest byte order */                  
    u16 avail_idx_shadow;                                                       

    /* How to notify other side. FIXME: commonalize hcalls! */                  
    bool (*notify)(struct virtqueue *vq);                                       

    /* DMA, allocation, and size information */                                 
    bool we_own_ring;                                                           
    size_t queue_size_in_bytes;                                                 
    dma_addr_t queue_dma_addr;  
    #ifdef DEBUG                                                                  
    /* They're supposed to lock for us. */                                    
    unsigned int in_use;                                                      

    /* Figure out if their kicks are too delayed. */                          
    bool last_add_time_valid;                                                 
    ktime_t last_add_time;                                                    
#endif                                                                        

    /* Per-descriptor state. */                                               
    struct vring_desc_state desc_state[];                                     
};                                                                            

其实你会发现vring_virtqueue里面最开始的位置就是vq,那么通过container_of就可以访问到vring_virtqueue. 接着我们看一下vring_virtqueue里面最重要的数据结构vring

struct vring {                                                                
    unsigned int num;

    struct vring_desc *desc;

    struct vring_avail *avail;

    struct vring_used *used;
};

可以看到vring里面又包含了三个非常重要的数据结构 vring_desc,vring_avail, vring_used.先看一下vring_avail数据结构(注:这三个数据结构前后端是通过共享内存来访问的)

struct vring_avail {                                                          
    __virtio16 flags;
    __virtio16 idx;
    __virtio16 ring[];
};

其中vring_avail flags是用来告诉后端是否要发中断,具体后端代码如下,以dpdk 中的vhost_user为例

   if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))                                                                                                                     
            eventfd_write((int)vq->callfd, 1);

可以看到当avail flags为0时后端就后发中断到guest里面。再看一下idx作用是什么呢?在函数virtqueue_add的代码片断如下

vq->avail_idx_shadow++;                                                   
vq->vring.avail->idx = cpu_to_virtio16(_vq->vdev, vq->avail_idx_shadow);  
vq->num_added++;  

每次发包的时候都会先添加buf,同时idx也会自增,也就是说idx记录了当前guest里面往后端expose了多少buf。

看完vring_avail,再来看一下vring_used。还是先来看一下具体的数据结构

struct vring_used {                                                                
    __virtio16 flags;                                                              
    __virtio16 idx;                                                                
    struct vring_used_elem ring[];                                                 
};  

其中idx主要由后端来更新,表示后端已经处理了多少buf,还以dpdk virtio_txrx.c中rte_vhost_dequeue_burst函数(该函数主要是把guest的代码发往host)部分代码为例子,代码如下:

    if (unlikely(alloc_err == 1))                                           
            break;                                                              

        m->nb_segs = seg_num;                                                   

        pkts[entry_success] = m;                                                
        vq->last_used_idx++;                                                    
        entry_success++;                                                        
    }                                                                           

    rte_compiler_barrier();                                                     
    vq->used->idx += entry_success;                                             
    /* Kick guest if required. */                                               
    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))                       
        eventfd_write((int)vq->callfd, 1);                                      
    return entry_success; 

可以看到后端成功处理完前端ring里面的包之后,就会把vring_used idx更新掉。

现在开始回到正题,先看一下怎么把这个queue唤醒,具体函数如下

static void skb_xmit_done(struct virtqueue *vq)                                                                                                                                   
{                                                                                   
    struct virtnet_info *vi = vq->vdev->priv;                                       
    struct napi_struct *napi = &vi->sq[vq2txq(vq)].napi;                            

    /* Suppress further interrupts. */                                              
    virtqueue_disable_cb(vq);                                                       

    if (napi->weight)                                                               
        virtqueue_napi_schedule(napi, vq);                                          
    else                                                                            
        /* We were probably waiting for more output buffers. */                     
        netif_wake_subqueue(vi->dev, vq2txq(vq)); 
        /*把queue唤醒 */                                  
} 

那么这个函数又是怎么被调用的呢?init_vqs函数里面会调用virtnet_find_vqs,具体代码如下

for (i = 0; i < vi->max_queue_pairs; i++) {                              
        callbacks[rxq2vq(i)] = skb_recv_done;                                
        callbacks[txq2vq(i)] = skb_xmit_done;                                
        sprintf(vi->rq[i].name, "input.%d", i);                              
        sprintf(vi->sq[i].name, "output.%d", i);                             
        names[rxq2vq(i)] = vi->rq[i].name;                                   
        names[txq2vq(i)] = vi->sq[i].name;                                   
        if (ctx)                                                             
            ctx[rxq2vq(i)] = true;                                           
    } 
ret = vi->vdev->config->find_vqs(vi->vdev, total_vqs, vqs, callbacks, names, ctx, NULL);                                                                           

skb_xmit_done是为每个queue注册的一个callback函数,而这个callback函数在什么时候会被调用呢?接下来看一下find_vqs函数

int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs,                         
        struct virtqueue *vqs[], vq_callback_t *callbacks[],                       
        const char * const names[], const bool *ctx,                               
        struct irq_affinity *desc)                                                 
{                                                                                  
    int err;                                                                       

    /* Try MSI-X with one vector per queue. */                                     
    err = vp_find_vqs_msix(vdev, nvqs, vqs, callbacks, names, true, ctx, desc);                                                                                                   
    if (!err)                                                                      
        return 0;                                                                  
    /* Fallback: MSI-X with one vector for config, one shared for queues. */       
    err = vp_find_vqs_msix(vdev, nvqs, vqs, callbacks, names, false, ctx, desc);
    if (!err)                                                                      
        return 0;                                                                  
    /* Finally fall back to regular interrupts. */                                 
    return vp_find_vqs_intx(vdev, nvqs, vqs, callbacks, names, ctx);               
}           

最终会调到__vring_new_virtqueue 这个函数为每个vring初始化callback函数

struct virtqueue *__vring_new_virtqueue(unsigned int index,                         
                    struct vring vring,                                             
                    struct virtio_device *vdev,                                     
                    bool weak_barriers,                                             
                    bool context,                                                   
                    bool (*notify)(struct virtqueue *),                             
                    void (*callback)(struct virtqueue *),                       
                    const char *name)                                           
{                                                                               
    unsigned int i;                                                             
    struct vring_virtqueue *vq;                                                 

    vq = kmalloc(sizeof(*vq) + vring.num * sizeof(struct vring_desc_state),     
             GFP_KERNEL);                                                       
    if (!vq)                                                                    
        return NULL;                                                            

    vq->vring = vring;                                                          
    vq->vq.callback = callback;                                                 
    vq->vq.vdev = vdev;                                                         
    vq->vq.name = name;                                                         
    vq->vq.num_free = vring.num;                                                
    vq->vq.index = index;                                                       
    vq->we_own_ring = false;                                                    
    vq->queue_dma_addr = 0;                                                     
    vq->queue_size_in_bytes = 0;                                                
    vq->notify = notify;                                                        
    vq->weak_barriers = weak_barriers;                                          
    vq->broken = false;                                                         
    vq->last_used_idx = 0;                                                      
    vq->avail_flags_shadow = 0;                                                 
    vq->avail_idx_shadow = 0;                                                   
    vq->num_added = 0;                                                          
    list_add_tail(&vq->vq.list, &vdev->vqs); 

然后通过request_irq为每个vq注册msix的中断函数

err = request_irq(pci_irq_vector(vp_dev->pci_dev, msix_vec),               
                  vring_interrupt, 0,                                                                                                                                             
                  vp_dev->msix_names[msix_vec],                                    
                  vqs[i]);  

然后会在vring_interrupt里面调用skb_xmit_done

irqreturn_t vring_interrupt(int irq, void *_vq)                                 
{                                                                                                                                                                                 
    struct vring_virtqueue *vq = to_vvq(_vq);                                   

    if (!more_used(vq)) {                                                       
        pr_debug("virtqueue interrupt with no work for %p\n", vq);              
        return IRQ_NONE;                                                        
    }                                                                           

    if (unlikely(vq->broken))                                                   
        return IRQ_HANDLED;                                                     

    pr_debug("virtqueue callback for %p (%p)\n", vq, vq->vq.callback);          
    if (vq->vq.callback)                                                        
        vq->vq.callback(&vq->vq);                                               

    return IRQ_HANDLED;                                                         
}                                

到此我们知道只要num_free小于18就会stop queue,只要后端能正常发中断上来就可以把queue 唤醒。只要能唤醒能正常发包就会主动去回收old skb,具体函数见free_old_xmit_skbs。要想知道出问题的时候有没有对buf进行回收,只需要看对vq->last_used_id 和vq->vring.used->idx。如果后者比前者要大,那就说明没有进行回收。通常是因为后端处理完包之后没有把中断发上来。

后端发中断的条件

通常后端发中断的条件有种一种是基于vring.avail->flags 比如 dpdk中的vhost_user。另外一种是基于event 比如vhost_net。下面来看一下具体代码

static inline int vring_need_event(__u16 event_idx, __u16 new_idx, __u16 old)    
{                                                                           
    /* Note: Xen has similar logic for notification hold-off                
     * in include/xen/interface/io/ring.h with req_event and req_prod       
     * corresponding to event_idx + 1 and new_idx respectively.             
     * Note also that req_event and req_prod in Xen start at 1,             
     * event indexes in virtio start at 0. */                               
    return (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old);       
}                                                                           

我们再来看一下调用这个函数的地方

static bool vring_notify(VirtIODevice *vdev, VirtQueue *vq)                     
{                                                                               
    uint16_t old, new;                                                          
    bool v;                                                                     
    /* We need to expose used array entries before checking used event. */      
    smp_mb();                                                                   
    /* Always notify when queue is empty (when feature acknowledge) */          
    if (((vdev->guest_features & (1 << VIRTIO_F_NOTIFY_ON_EMPTY)) &&            
         !vq->inuse && vring_avail_idx(vq) == vq->last_avail_idx)) {            
        return true;                                                            
    }                                                                           

    if (!(vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX))) {             
        return !(vring_avail_flags(vq) & VRING_AVAIL_F_NO_INTERRUPT);           
    }                                                                           

    v = vq->signalled_used_valid;                                               
    vq->signalled_used_valid = true;                                            
    old = vq->signalled_used;                                                   
    new = vq->signalled_used = vring_used_idx(vq);                                                                                                                                
    return !v || vring_need_event(vring_used_event(vq), new, old);              
}                   

vring_used_event具体函数如下

static inline uint16_t vring_used_event(VirtQueue *vq)                      
{                                                                    
    return vring_avail_ring(vq, vq->vring.num);
} 

static inline uint16_t vring_avail_ring(VirtQueue *vq, int i)
{                                                                     
    hwaddr pa;                                                      
    pa = vq->vring.avail + offsetof(VRingAvail, ring[i]);           
    return virtio_lduw_phys(vq->vdev, pa);
}                                     

可以看到vring_used_idx做的事就是从vring_avail->ring[num(256)]处拿到前端已经回收的used buf。而前端存放这个值的代码如下

    if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT))
        virtio_store_mb(vq->weak_barriers,                                  
                &vring_used_event(&vq->vring),
                cpu_to_virtio16(_vq->vdev, vq->last_used_idx));

故后端是不是要通知前端的条件就是当前qemu端已经处理完的buf数量减去之前qemu处理的buf数量大于当前qemu已经处理的buf数量减去前端已经回收的数量,也是说前端处理的比较快。

总结

在前端流量很大的情况下queue很容易进入stop状态,但是正常情况进入stop的同时会告诉后端处理完包后发个中断上来把queue唤醒。正如前文所说的我遇到的两例都是因为后端没有发中断上来,导致前端网络驱动最终无法工作。而且最终的原因也都是因为发中断到guest的fd弄错了。关于这个fd,后面会专门写一篇来详述。