libtask延时任务

2015 年写了 libtask 调度相关的分析,后面说会继续分析延时任务和 channel 的实现, 舔着脸硬是拖到了现在。说没时间其实是借口,少撸一次不就有了。

1) 什么是延时任务

如果不能理解什么延时任务的话。讲真,纯粹是智商问题? 或者语文没学好?

简单的从字母理解就是,任务推迟一段时间执行,复杂的理解,其实也是一样。

具体例子可参考testdelay.c, 这个例子的核心代码就是调用 taskdelay 来做非阻塞的延时。 其他的逻辑比较简单,我们不做分析。

2)协程如何延时?

延时执行最简单的办法就是直接调用 sleep, 但 sleep 会阻塞协程哇~ 因为当前 libtask 做的比较简单,用户在使用协程的时候如果使用阻塞的方法的话,会导致其他协程无法被调度。因为根本就不会切换到调度协程, 这个是使用过程中需要注意的点。 结论就是 sleep 不可行。可行的话,我这里还逼逼毛线。

libtask 实现延时队列的核心就是 poll 超时, 加上一个有序的等待超时队列。

3)代码分析

大部分的程序员有个通病,没有代码很多事情说不清楚,而我不幸的属于大部分。我们下面直接上代码。

3.1 启动系统检查协程

taskdelay 的相关实现在 fd.c, 这个函数首先会简单是否有启动过一个叫 fdtask 的系统协程, 这个协程专门用来检查是否有延时任务或者网络事件需要处理,并把相关的协程加入就绪队列, 后面会详细分析 fdtask。代码如下:

// 启动 fdtask, 作为系统任务
if(!startedfdtask){
   startedfdtask = 1;
   taskcreate(fdtask, 0, 32768);
}  

3.2 加入等待队列

因为 libtask 需要逐个调度任务,对于延时任务来说当然是按照延时到达时间来做调度。所以当有延时任务过来的时候,需要根据延时到达时间来做排序,并放到双向链表。

now = nsec();
// 设置唤醒时间
when = now+(uvlong)ms*1000000;
// sleeping 就是延时队列, 根据唤醒时间来对 task 进行排序
// 这里是为了找到最后一个唤醒时间比当前任务早的task
for(t=sleeping.head; t!=nil && t->alarmtime < when; t=t->next)
     ;

// 省略插入双向链表的
...
taskswitch();

// 如果又切回来,说明该任务又加到了就绪队列,然后被执行到
// 也就是休眠的时间到了, 被唤醒
return (nsec() - now)/1000000; 

taskdelay 的逻辑很简单,根据延时到达时间把协程放入等待队列,然后调用 taskswitch 出让 cpu, 重新切回调度协程。然后等待延时时间到达之后,会重新调度到这里,以来唤醒这个协程继续执行。

3.3 唤醒

我们上面也看到了,taskdelay 其实也就是把协程放入等待的队列,接着就是等待唤醒,重新调度执行这个协程的代码。

那么谁来唤醒的呢? 答案就是我们上面说到的这个 fdtask 系统协程。

这个协程不会不断查询延时队列,看是否有延时已经到达的协程,如果有就会把这个协程加入就绪队列,重新调度。fdtask 同时也会负责网络时间的查询,这个我们这是没有使用到。

void
fdtask(void *v)
{
    int i, ms;
    Task *t;
    uvlong now;

    // task 标记为系统任务
    tasksystem();
    taskname("fdtask");
    for(;;){
        /* let everyone else run */
        // 让党员先跑。哦,不,让其他 task 先跑
        while(taskyield() > 0)
            ;
        /* we're the only one runnable - poll for i/o */
        errno = 0;
        taskstate("poll");

我们把 fdtask 分几部分来分析吧,感觉这样比较不累。第一部分核心的点就是调用 tasksystem() 把该协程标记为系统协程,当只剩下系统协程的时候,程序会退出。

还有另外一个就是我们看到的 while 循环不断调用出让 cpu。这个说明了什么?

"嗯,这个协程很有礼貌"。

这个不断的出让 CPU 直到没有可调度的协程,如果协程比较多的时候可能导致延时会比设定的时间长很多,这个需要注意。

        // 设置 poll 等待时间
        // 如果sleep 队列没有 task 等待唤醒,那么这个 task 也会无限等待
        // 如果没有延时任务, 启动这个 task, 说明是有其他的 fd 读写?
        if((t=sleeping.head) == nil)
            ms = -1;
        else{
            /* sleep at most 5s */
            // 如果是有定时任务,那么最多 sleep 5秒
            now = nsec();
            if(now >= t->alarmtime)
                ms = 0; // poll 设置为 0 为立即返回
            else if(now+5*1000*1000*1000LL >= t->alarmtime)
                ms = (t->alarmtime - now)/1000000;
            else
                ms = 5000;
        }
        // 查询读写事件
        if(poll(pollfd, npollfd, ms) < 0){
            if(errno == EINTR)
                continue;
            fprint(2, "poll: %s\n", strerror(errno));
            taskexitall(0);
        }

接着就是定时任务最核心的部分,直接取 sleeping.head 的延时到达时间,然后设置为 poll 的超时时间。

我们上面说过 sleeping 这个等待队列,是根据延时到达时间来排序的,所以第一个一定是最先到达的任务。

       /* wake up the guys who deserve it */
        // 如果有事件到来,唤醒等待的协程
        for(i=0; i<npollfd; i++){
            while(i < npollfd && pollfd[i].revents){
                // 把事件对应的协程重新加入就绪队列
                taskready(polltask[i]);
                --npollfd;
                pollfd[i] = pollfd[npollfd];
                polltask[i] = polltask[npollfd];
            }
        }

        now = nsec();
        // 检查是否有已经到了唤醒时间的任务,
        // 如果有, 从sleep 队列删除, 加入到就绪队列等待执行
        while((t=sleeping.head) && now >= t->alarmtime){
            deltask(&sleeping, t);
            if(!t->system && --sleepingcounted == 0)
                taskcount--;
            taskready(t);
        }
    }

最后一部分代码可以看到 fdtask 除了处理延时任务还会处理网络读写事件,而且是先检查网络事件,接着再把延时时间到达的任务加到就绪队列。

这里也会有个问题,如果网络事件比较多或者处理比较慢,也会导致延时任务不能及时执行。

END

虽然写的比较长,但延时任务的实现整体上是比较简单的。就是使用 poll + 等待队列。更加详细的代码和注释,可以参考 libtask-annotation