Linux线程池
什么是池?
池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化
开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取
服务器处理完一个客户连接后,可以把相关的资源放回池中
为什么要创建线程池?
当有客户端连接时,创建耗时为:线程时间+处理时间+销毁线程时间
(耗时T1+T2+T3)
若需要大量的线程来完成任务,且完成任务的时间比较短,这样花费在T1和T3上面的时间比较多,效率低。
因此,提前创建好一定数量的线程,可以避免花费大量时间在T1和T3上,提高效率
当提前创建的线程不够用怎么办?
创建线程池之前,可以先设置线程池的属性
min_thr_num//最小线程数
max_thr_num//最大线程数
default_thr_num//添加或删除线程的步长
live_thr_num;//当前线程数
busy_thr_num//忙碌线程数
可以设置线程池的最大线程数和最小线程数
设置步长的意思是,当默认创建的线程数量不够时,一次性拓展一定数目线程(步长),当达到最大线程数目的时候就无法拓展。或者当空闲线程数量过多时,一次性销毁一定数量线程,当到最小线程数时,无法删除
工作的机制

首先服务器维护两个条件变量(图中没有画出互斥锁),其中一个维护线程池和服务器任务队列(队列不为空),另外一个维护服务器任务队列和客户端(队列不为满)
先看线程池这一块
当任务队列当中没有任务时,线程池堵塞在条件变量上,等待任务
当有任务进来时,条件变量发信号或者广播,唤醒线程,此时对任务队列而言属于共享资源,需要使用互斥量,避免资源冲突
从生产者消费模型上看,服务器为生产者,线程池为消费者
服务器和客户端
同理当任务队列满的时候,客户端阻塞等待服务器连接
当队列不为满时,条件变量发信号或者广播,通知客户端进行链接,也需要使用互斥量操作任务队列
从生产者消费模型上看,服务器为消费者,客户端为生产
代码实现如下
代码中摘除了网络基础模块,自定义模拟客户端连接
main.c
#include "threadpool.h"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
void* mytask(void *arg)
{
printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main(void)
{
threadpool_t *pool;
threadpool_init(&pool, 3);
int i;
for (i=0; i<10; i++)
{
int *arg = (int *)malloc(sizeof(int));
*arg = i;
threadpool_add_task(pool, mytask, arg);
}
//sleep(15);
threadpool_destroy(pool);
return 0;
}
threadpool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include "condition.h"
// 任务结构体,将任务放入队列由线程池中的线程来执行
typedef struct task
{
void *(*run)(void *arg);// 任务回调函数
void *arg;// 回调函数参数
struct task *next;// 链表队列
} task_t;
// 线程池结构体
typedef struct threadpool
{
condition_t ready;//任务准备就绪或者线程池销毁通知
task_t *first;//任务队列头指针
task_t *last;//任务队列为指针
int counter;//线程池中当前线程数
int idle;//线程池中当前正在等待任务的线程数
int max_threads;//线程池中最大允许的线程数
int quit;//销毁线程池的时候置1
} threadpool_t;
// 初始化线程池
void threadpool_init(threadpool_t **pool, int threads);
// 往线程池中添加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
// 销毁线程池
void threadpool_destroy(threadpool_t *pool);
#endif /* _THREAD_POOL_H_ */
threadpool.c
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
void *thread_routine(void *arg)
{
struct timespec abstime;
threadpool_t *pool = (threadpool_t *)arg;
printf("thread 0x%x is starting\n", (int)pthread_self());
//1. 设置为自分离线程
pthread_detach(pthread_self());
//3. 进入轮训工作模式,如果不退出且队列不为空则一直工作
while(1)
{
//1. 先去看下有没有任务,有任务则处理任务,没有再wait
condition_lock(&pool->ready);
printf("thread 0x%x is working\n", (int)pthread_self());
if(pool->first != NULL)
{
task_t *t = pool->first; //取出第一个任务
pool->first = t->next; //修改队列头
condition_unlock(&pool->ready); //先解锁,提高效率
//处理任务
t->run(t->arg);
free(t);
continue; //既然本次有任务,可能下次还有任务,则继续查看是否有任务
}
else
{
condition_unlock(&pool->ready);
}
if(pool->quit)
{
break;
}
//2. 如果没有任务,则等待
printf("thread 0x%x is waiting\n", (int)pthread_self());
while(1)
{
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2; //延时2s
//condition_wait(&pool->ready);
condition_lock(&pool->ready);
pool->idle++;
int status = condition_timedwait(&pool->ready, &abstime);
condition_unlock(&pool->ready);
if (status != ETIMEDOUT || pool->quit)
{
printf("thread 0x%x get signal\n", (int)pthread_self());
break;
}
else
{
printf("thread 0x%x is wait timed out\n", (int)pthread_self());
if(pool->counter >= 3)
{
goto THREAD_EXIT;
}
}
}
}
THREAD_EXIT:
printf("thread 0x%x is exit\n", (int)pthread_self());
condition_lock(&pool->ready);
pool->counter--;
condition_unlock(&pool->ready);
pthread_exit(NULL); //退出线程
}
void threadpool_init(threadpool_t **pool, int threads)
{
//1. 初始化基本的线程池参数
int i;
threadpool_t *newpool = malloc(sizeof(threadpool_t));
*pool = newpool;
newpool->max_threads = threads;
newpool->quit = 0;
newpool->idle = 0; //??
newpool->first = NULL;
newpool->last = NULL;
newpool->counter = 0;
condition_init(&newpool->ready);
//2. 默认有线程数,则在初始化的时候同时初始化N个线程
#if 1
for(i= 0; i < threads; i++)
{
pthread_t tid;
if(pthread_create(&tid, NULL, thread_routine, newpool) == 0)//where is task?
{
condition_lock(&newpool->ready);
newpool->counter++;
condition_unlock(&newpool->ready);
}
}
#endif
}
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
if(pool->quit)
return;
//1. 生成任务包
task_t *task = malloc(sizeof(task_t));
task->run = run;
task->arg = arg;
//2. 加入task队列, 先上锁,再添加,再解锁
printf("Add new task %p ! \n", task);
condition_lock(&pool->ready);
if(pool->last == NULL)
{
pool->last = task; //队列头
pool->first = pool->last; //初始化头
}
else
{
pool->last->next = task; // add
pool->last = task;
}
//3. 计算一下线程数是否满足任务处理速度,不满足则创建一批
if(pool->counter < pool->max_threads && pool->idle <= 0)
{
//??线程创建策略,根据实际环境选择
//策略1: 固定增长,每次增长??
//策略2: 指数增长,每次翻倍?? 也就是创建 pool->counter
//策略3: 线下增长,每次+1
// 策略4: 根据任务数量增长
pthread_t tid;
if(pthread_create(&tid, NULL, thread_routine, pool) == 0) //where is task?
{
pool->counter++;
}
}
//4. 通知线程去取任务处理
if(pool->idle > 0)
{
condition_signal(&pool->ready); //唤醒一个线程去处理任务
}
//5. 解锁
condition_unlock(&pool->ready);
}
void threadpool_destroy(threadpool_t *pool)
{
//1. 设置退出条件
pool->quit = 1;
//2. 等待所有线程退出
while(pool->counter > 0)
{
//3. 广播,通知所有线程退出
condition_lock(&pool->ready);
condition_broadcast(&pool->ready); //唤醒所有线程退出
condition_unlock(&pool->ready);
sleep(1);
}
//4. 销毁线程池对象
free(pool);
}
condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
} condition_t;
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif /* _CONDITION_H_ */
condition.c
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if ((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t* cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t* cond)
{
int status;
if ((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if ((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
Makefile
.PHONY:clean
CC=gcc
CFLAGS=-Wall -g
ALL=main
all:$(ALL)
OBJS=threadpool.o main.o condition.o
.c.o:
$(CC) $(CFLAGS) -c $<
main:$(OBJS)
$(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt
clean:
rm -f $(ALL) *.o
相关推荐
-
PHP8种变量类型的详细讲解2025-02-22 00:32:24
-
php+apache 和 php+nginx的区别2025-02-22 00:21:27
-
PHP:与workerman结合实现定时任务2025-02-22 00:15:57
-
Nginx的Rewrite规则与实例2025-02-22 00:15:39
-
MySql中身份证字段的简单脱敏介绍2025-02-22 00:15:36