Skip to content

线程池并发服务器

从进程池到线程池

使用进程池的思路来解决并发连接是一种经典的基于事件驱动模型的解决方案,但是由于进程天生具有隔离性,导致进程之间通信十分困难,一种优化的思路就是用线程来取代进程,即所谓的线程池。

首先什么是线程池?

线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。

为什么要有线程池?

  1. 以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。
  2. 之前在线程篇章中,我们也知道创建线程并非多多益善,所以我们的思路是提前创建好若干个线程,不退出,等待任务的产生,去接收任务处理后等待下一个任务。

线程池如何实现?需要思考2个问题?

  1. 假设线程池创建了,线程们如何去协调接收任务并且处理?
  2. 线程池上的线程如何能够执行不同的请求任务?

上述问题1就很像我们之前学过的生产者和消费者模型,客户端对应生产者,服务器端这边的线程池对应消费者,需要借助互斥锁和条件变量来搞定。

问题2解决思路就是利用回调机制,我们同样可以借助结构体的方式,对任务进行封装,比如任务的数据和任务处理回调都封装在结构体上,这样线程池的工作线程拿到任务的同时,也知道该如何执行了。

由于多线程是共享地址空间的,所以主线程和工作线程天然地通过共享文件描述符数值的形式共享网络文件对象,但是这种共享也会带来麻烦:每当有客户端发起请求时,主线程会分配一个空闲的工作线程完成任务,而任务正是在多个线程之间共享的资源,所以需要采用一定的互斥和同步的机制来避免竞争。

我们可以将任务设计成一个队列,任务队列就成为多个线程同时访问的共享资源,此时问题就转化成了一个典型的生产者——消费者问题:任务队列中的任务就是商品,主线程是生产者,每当有连接到来的时候,就将一个任务放入任务队列,即生产商品,而各个工作线程就是消费者,每当队列中任务到来的时候,就负责取出任务并执行。

下面是线程池的基本设计方案:

首先,我们先设计数据结构:

cpp
//通常把构建实际对象的函数称为工厂函数
//factory.h
#ifndef __FACTORY__
#define __FACTORY__
#include "taskQueue.h"

//这里用来描述整个进程池的信息,也是线程间共享的数据
typedef struct factory_s {
    pthread_t *tidArr;
    int threadNum;
    taskQueue_t taskQueue;
} factory_t;

int factoryInit(factory_t *pFactory, int threadNum);

#endif// __FACTORY__


//任务队列的设计
//taskQueue.h
#ifndef __TASK_QUEUE__
#define __TASK_QUEUE__
#include <func.h>

typedef struct task_s {
    int netFd;
    struct task_s *pNext;
} task_t;

typedef struct taskQueue_s {
    task_t *pFront;
    task_t *pRear;
    int queueSize;        //当前任务的个数
    pthread_mutex_t mutex;//任务队列的锁
    pthread_cond_t cond;
} taskQueue_t;

int taskEnQueue(taskQueue_t *pTaskQueue, int netFd);
int taskDeQueue(taskQueue_t *pTaskQueue);

#endif// __TASK_QUEUE__

随后,设计启动线程池的工厂函数:

cpp
int factoryInit(factory_t *pFactory, int threadNum) {
    bzero(pFactory, sizeof(factory_t));
    pFactory->threadNum = threadNum;
    pFactory->tidArr = (pthread_t *) calloc(threadNum, sizeof(pthread_t));

    pthread_cond_init(&pFactory->cond, NULL);

    bzero(&pFactory->taskQueue, sizeof(taskQueue_t));
    pthread_mutex_init(&pFactory->taskQueue.mutex, NULL);
}


int main(int argc, char *argv[]) {
    //./main 192.168.135.132 5678 10
    ARGS_CHECK(argc, 4);

    int workerNum = atoi(argv[3]);

    factory_t factory;
    factoryInit(&factory, workerNum);
    makeWorker(&factory);

    int sockFd;
    tcpInit(argv[1], argv[2], &sockFd);

    //...
}

接下来,主线程需要 accept 客户端的连接并且需要将任务加入到任务队列。(目前会引发主线程阻塞的行为只有 accept ,但是为了可维护性,即后续的需求可能需要主线程管理更多的文件描述符,所以我们使用 epoll 将网络文件加入监听)。一旦有新的客户端连接,那么主线程就会将新的任务加入任务队列,并且使用条件变量通知子线程。(如果没有空闲的子线程处于等待状态,这个任务会被直接丢弃)

cpp
int epfd = epollCtor();
int sockFd;

tcpInit(argv[1], argv[2], &sockFd);

epollAdd(sockFd, epfd);

struct epoll_event evs[2];

while (1) {
    int readyNum = epoll_wait(epfd, evs, 2, -1);

    for (int i = 0; i < readyNum; ++i) {
        if (evs[i].data.fd == sockFd) {
            int netFd = accept(sockFd, NULL, NULL);

            pthread_mutex_lock(&factory.taskQueue.mutex);
            taskEnQueue(&factory.taskQueue, netFd);
            printf("New Task!\n");
            pthread_cond_signal(&factory.taskQueue.cond);
            pthread_mutex_unlock(&factory.taskQueue.mutex);
        }
    }
}

子线程在启动的时候,会使用条件变量使自己处于阻塞状态,一旦条件满足之后,就立即从任务队列中取出任务并且处理该事件。

cpp
void makeWorker(factory_t *pFactory) {
    for (int i = 0; i < pFactory->threadNum; ++i) {
        pthread_create(pFactory->tidArr + i, NULL, threadFunc, (void *) pFactory);
    }
}


void *threadFunc(void *pArgs) {
    factory_t *pFactory = (factory_t *) pArgs;
    while (1) {
        pthread_mutex_lock(&pFactory->taskQueue.mutex);
        while (pFactory->taskQueue.queueSize == 0) {
            pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory > taskQueue.mutex);
        }

        printf("Get Task!\n");
        int netFd = pFactory->taskQueue.pFront->netFd;
        taskDeQueue(&pFactory->taskQueue);
        pthread_mutex_unlock(&pFactory->taskQueue.mutex);
        
        handleEvent(netFd);
        printf("pthread done! tid = %lu\n", pthread_self());
    }
}


int handleEvent(int netFd) {
    transFile(netFd);
    close(netFd);

    return 0;
}

下面任务队列操作的函数:

cpp
int taskEnQueue(taskQueue_t *pTaskQueue, int netFd) {
    task_t *pTask = (task_t *) calloc(1, sizeof(task_t));
    pTask->netFd = netFd;

    if (pTaskQueue->queueSize == 0) {
        pTaskQueue->pFront = pTask;
        pTaskQueue->pRear = pTask;
    } else {
        pTaskQueue->pRear->pNext = pTask;
        pTaskQueue->pRear = pTask;
    }

    ++pTaskQueue->queueSize;

    return 0;
}


int taskDeQueue(taskQueue_t *pTaskQueue) {
    task_t *pCur = pTaskQueue->pFront;
    pTaskQueue->pFront = pTaskQueue->pFront->pNext;
    free(pCur);
    --pTaskQueue->queueSize;
    
    return 0;
}

线程池的退出

简单退出

编程规范要求,信号进制不能应用在多线程应用中——其主要的原因时当多线程进程执行过程中,一旦产生了信号,则递送的信号的线程是未知的,有可能是主线程也有可能是子线程。这样的话,线程终止的处理就非常繁琐,也不够清晰明了。

解决这个问题方式是在原来设计的基础上引入多进程机制:将进程池改造成一个父进程和一个子进程组成的应用程序。其中父进程负责递送信号,而子进程负责创建和运行进程池(也就是对应之前的已完成代码),父子进程之间通过管道通信。当信号的产生的时候,父进程递送该信号,并且在信号处理函数的执行过程中,写入一个消息给管道,此外,子进程会使用IO多路复用机制监听管道,这样一旦管道就绪,子进程的主线程就可以得知程序将要被终止的信息,随后即可依次关闭子线程。

cpp
int exitPipe[2];

void sigFunc(int signum) {
    printf("signum = %d\n", signum);
    write(exitPipe[1], "1", 1);
    puts("Parent exit!");
}


int main(int argc, char *argv[]) {
    //./main 192.168.135.132 5678 10
    ARGS_CHECK(argc, 4);

    pipe(exitPipe);
    if (fork() != 0) {
        close(exitPipe[0]);
        signal(SIGUSR1, sigFunc);
        wait(NULL);

        exit(0);
    }
    close(exitPipe[1]);

    int workerNum = atoi(argv[3]);

    factory_t factory;
    factoryInit(&factory, workerNum);
    makeWorker(&factory);

    int epfd = epollCtor();
    int sockFd;
    tcpInit(argv[1], argv[2], &sockFd);

    epollAdd(sockFd, epfd);
    epollAdd(exitPipe[0], epfd);

    struct epoll_event evs[2];
    while (1) {
        int readyNum = epoll_wait(epfd, evs, 2, -1);

        for (int i = 0; i < readyNum; ++i) {
            if (evs[i].data.fd == sockFd) {
                int netFd = accept(sockFd, NULL, NULL);

                pthread_mutex_lock(&factory.taskQueue.mutex);
                taskEnQueue(&factory.taskQueue, netFd);
                printf("New Task!\n");
                pthread_cond_signal(&factory.taskQueue.cond);
                pthread_mutex_unlock(&factory.taskQueue.mutex);
            } else if (evs[i].data.fd == exitPipe[0]) {
                puts("exit threadPool!");

                for (int j = 0; j < workerNum; ++j) {
                    pthread_cancel(factory.tidArr[j]);
                }
                for (int j = 0; j < workerNum; ++j) {
                    pthread_join(factory.tidArr[j], NULL);
                }
                puts("done");

                exit(0);
            }
        }
    }
}

然后直接使用上述代码会存在一个问题,那就是只能关闭掉一个子线程,这里的原因其实比较简单 pthread_cond_wait 是一个取消点,所以收到了取消之后,线程会唤醒并终止,然而由于条件变量的设计,所以线程终止的时候它是持有锁的,这就导致死锁。这种死锁的解决方案就是引入资源清理机制,在加锁行为执行的时候立刻将清理行为压入资源清理栈当中。

cpp
void cleanFunc(void *pArgs) {
    factory_t *pFactory = (factory_t *) pArgs;
    pthread_mutex_unlock(&pFactory->taskQueue.mutex);
}


void *threadFunc(void *pArgs) {
    int netFd;

    while (1) {
        factory_t *pFactory = (factory_t *) pArgs;

        pthread_mutex_lock(&pFactory->taskQueue.mutex);
        pthread_cleanup_push(cleanFunc, (void *) pFactory);

        while (pFactory->taskQueue.queueSize == 0) {
            pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory > taskQueue.mutex);
        }

        printf("Get Task!\n");
        netFd = pFactory->taskQueue.pFront->netFd;
        taskDeQueue(&pFactory->taskQueue);
        pthread_cleanup_pop(1);
        handleEvent(netFd);
        printf("pthread done! tid = %lu\n", pthread_self());
    }
}

优雅退出

如果使用 pthread_cancel ,由于读写文件的函数是取消点,那么正在工作线程也会被终止,从而导致正在执行的下载任务无法完成。如何实现线程池的优雅退出呢?一种解决方案就是不使用 pthread_cancel ,而是让每个工作线程在事件循环开始的时候,检查一下线程池是否处于终止的状态,这样子线程就会等待当前任务执行完成了之后才会终止。

cpp
//...//

else if (evs[i].data.fd == exitPipe[0]) {
    puts("exit threadPool!");
    factory.runningFlag = 0;
    pthread_cond_broadcast(&factory.taskQueue.cond);
    for (int j = 0; j < workerNum; ++j) {
        pthread_join(factory.tidArr[j], NULL);
    }

    puts("done");
    exit(0);
}


//..//


void *threadFunc(void *pArgs) {
    int netFd;

    while (1) {
        factory_t *pFactory = (factory_t *) pArgs;
        pthread_mutex_lock(&pFactory->taskQueue.mutex);
        pthread_cleanup_push(cleanFunc, (void *) pFactory);

        while (pFactory->taskQueue.queueSize == 0) {
            pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory > taskQueue.mutex);
            if (pFactory->runningFlag == 0) {
                puts("child exit");
                pthread_exit(NULL);
            }
        }

        printf("Get Task!\n");
        netFd = pFactory->taskQueue.pFront->netFd;
        taskDeQueue(&pFactory->taskQueue);
        pthread_cleanup_pop(1);
        handleEvent(netFd, pFactory);
        printf("pthread done! tid = %lu\n", pthread_self());
    }
}


int handleEvent(int netFd, factory_t *pFactory) {
    transFile(netFd);
    close(netFd);

    if (pFactory->runningFlag == 0) {
        puts("child exit");
        pthread_exit(NULL);
    }

    return 0;
}