线程总结(九):自定义线程池

Java 已经为我们提供了一系列创建线程池的方法,这些方法可以完全满足我们的需求,所以说,不提倡自己编写线程池,但是为了理解线程池的概念,手写一个简单的线程池。

忘了在哪里看过一个问题:线程池存在的意义是什么?

其中有一条回答是:线程的创建和关闭所消耗的资源,要远远大于线程空转所消耗的资源。

这就可以理解线程池的本质:有若干个空转的线程,一直等待着,等待着执行任务,当有任务给他,他去执行,执行完成之后,继续回到空转的状态。

在理解了线程池的本质之后,我们来写一个简单的线程池实现:

首先,得有一个集合,来存储我们的任务:

LinkedList<Runnable> runnables = null;

当我们创建线程池时,实例化这个集合:

public class MyThreadPool {
    LinkedList<Runnable> runnables = null;
    
    public MyThreadPool(){
        runnables = new LinkedList<Runnable>();
    }
}

然后我们需要创建几个空转的线程,而这些线程是从我们的任务集合中拿任务的,如果任务集合是空的,那么线程就一直空转着,如果有任务,就拿来执行。

一般来说,我们开启的线程数是我们电脑 CPU 数量的1.5~2 倍,开的多了效率反而下降。在这里我们(随便)设定一个值用来控制线程的开启数量

int maxThreadCount = 5;

当然,还需要一个值来确定正在运行的线程的数量,每执行一个线程,该值就自动加 1,假如一个线程还没来得及执行 +1 操作另外一个线程就开启了呢?所以需要将该值设置为 volatile:

volatile int runningThreadCount = 0;

接下来就是传入任务了,一个 execute(Runnable runnable) 方法,使得我们可以将任务传入到集合中,每添加一个任务,然后判断正在运行的线程数是否超过之前设定的最大值,如果没有超过,则新建一个线程并且启动它,然后给 runningThreadCount +1:

    public void execute(Runnable runnable) {
            runnables.add(runnable);
            if (runningThreadCount < maxThreadCount) {
                new MyThread().start();
                runningThreadCount++;
            }
    }

接下来就是一个线程类,去循环的从集合中拿到任务并执行,如果任务集合不为空,就从中拿到任务然后执行其 run 方法:

    class MyThread extends Thread {
        @Override
        public void run() {
            for (;;) {
                if (runnables.size() > 0) {
                    runnables.removeFirst().run();
                }
            }
        }
    }

这样下来似乎可以了,但是还存在一个问题就是,当集合中任务执行完之后,线程依旧会一直空转,这样也是浪费资源嘛,所以还需要修改一下,在没有任务可以执行的时候线程休眠,等有了任务再唤醒线程让他去执行:

    public void execute(Runnable runnable) {
        runnables.add(runnable);
        synchronized (runnables) {
            runnables.notifyAll();
        }
        if (runningThreadCount < maxThreadCount) {
            new MyThread().start();
            runningThreadCount++;
        }
    }
    
    class MyThread extends Thread {
        @Override
        public void run() {
            synchronized (runnables) {
                for (;;) {
                    if (runnables.size() > 0) {
                        runnables.removeFirst().run();
                    } else {
                        try {
                            runnables.wait();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }    

这样就 OK 了,然后再添加一个关闭线程池的方法,当线程关闭之后:

  1. 无法再添加新任务
  2. 任务集合中的任务执行完之后,销毁所有线程。
boolean isShutdown = false;

    public void execute(Runnable runnable) {
        if (!isShutdown) {
            runnables.add(runnable);
            synchronized (runnables) {
                runnables.notifyAll();
            }
            if (runningThreadCount < maxThreadCount) {
                new MyThread().start();
                runningThreadCount++;
            }
        }
    }

    public void shutdown() {
        isShutdown = true;
    }

    class MyThread extends Thread {
        @Override
        public void run() {
            synchronized (runnables) {
                for (;;) {
                    if (runnables.size() > 0) {
                        runnables.removeFirst().run();
                    } else {
                        if (isShutdown) {
                            break;
                        } else {
                            try {
                                runnables.wait();
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    }

完整的代码是:

public class MyThreadPool {
    LinkedList<Runnable> runnables = null;
    int maxThreadCount = 5;
    volatile int runningThreadCount = 0;
    boolean isShutdown = false;

    public MyThreadPool() {
        runnables = new LinkedList<Runnable>();
    }

    public void execute(Runnable runnable) {
        if (!isShutdown) {
            runnables.add(runnable);
            synchronized (runnables) {
                runnables.notifyAll();
            }
            if (runningThreadCount < maxThreadCount) {
                new MyThread().start();
                runningThreadCount++;
            }
        }
    }

    public void shutdown() {
        isShutdown = true;
    }

    class MyThread extends Thread {
        @Override
        public void run() {
            synchronized (runnables) {
                for (;;) {
                    if (runnables.size() > 0) {
                        runnables.removeFirst().run();
                    } else {
                        if (isShutdown) {
                            break;
                        } else {
                            try {
                                runnables.wait();
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    }
}

测试代码:

public class TestMyThreadPool {
    public static void main(String[] args) {
        MyThreadPool pool = new MyThreadPool();

        pool.execute(new MyTask(1));
        pool.execute(new MyTask(2));
        pool.execute(new MyTask(3));
        pool.execute(new MyTask(4));
        pool.execute(new MyTask(5));

        try {
            Thread.currentThread().sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        pool.execute(new MyTask(6));
        pool.execute(new MyTask(7));
        pool.execute(new MyTask(8));
        pool.shutdown();
        pool.execute(new MyTask(9));
        pool.execute(new MyTask(10));
        pool.execute(new MyTask(11));
        pool.execute(new MyTask(12));

    }
}

class MyTask implements Runnable {

    int i = 0;

    public MyTask(int i) {
        this.i = i;
    }

    @Override
    public void run() {
        try {
            System.out.println("任务体 " + i + " 开始执行,Thread ID = " + Thread.currentThread().getId() + ",Priority = "
                    + Thread.currentThread().getPriority());
            Thread.sleep(1000);
            System.out.println("任务体 " + i + " 执行完毕");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

执行结果:

任务体 1 开始执行,Thread ID = 9,Priority = 5
任务体 1 执行完毕
任务体 2 开始执行,Thread ID = 9,Priority = 5
任务体 2 执行完毕
任务体 3 开始执行,Thread ID = 9,Priority = 5
任务体 3 执行完毕
任务体 4 开始执行,Thread ID = 9,Priority = 5
任务体 4 执行完毕
任务体 5 开始执行,Thread ID = 9,Priority = 5
任务体 5 执行完毕
任务体 6 开始执行,Thread ID = 13,Priority = 5
任务体 6 执行完毕
任务体 7 开始执行,Thread ID = 13,Priority = 5
任务体 7 执行完毕
任务体 8 开始执行,Thread ID = 13,Priority = 5
任务体 8 执行完毕

通过线程 ID 可以看出一共有两个线程在执行任务(实际上只是其他几个线程没有抢到 CPU 权限而已)。

就这样,一个简单的线程池就实现了。

严正声明 实际编程中并不建议自己去写线程池,因为 Java 提供的几种创建线程池的方法已经足够满足日常使用了,自己写不光容易出错,效率也不好

虽然线程池是构建多线程应用的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

线程池使用的风险

死锁

任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程 死锁了。死锁的最简单情形是:线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),否则死锁的线程将永远等下去。

虽然任何多线程程序中都有死锁的风险,但线程池却引入了另一种死锁可能,在那种情况下,所有池线程都在执行已阻塞的等待队列中另一任务的执行结果的任务,但这一任务却因为没有未被占用的线程而不能运行。当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任务执行,查询对象又同步等待着响应时,会发生这种情况。

资源不足

线程池的一个优点在于:相对于其它替代调度机制(有些我们已经讨论过)而言,它们通常执行得很好。但只有恰当地调整了线程池大小时才是这样的。线程消耗包括内存和其它系统资源在内的大量资源。除了 Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java 线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性能。

如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例如 JDBC 连接、套接字或文件。这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。

并发错误

线程池和其它排队机制依靠使用 wait() 和 notify() 方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。使用这些方法时,必须格外小心;即便是专家也可能在它们上面出错。而最好使用现有的、已经知道能工作的实现。

线程泄漏

各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。

有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久停止,而这些停止的任务也会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们自己的线程,要么只让它们等待有限的时间。

请求过载

仅仅是请求就压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可以用一个指出服务器暂时很忙的响应来拒绝请求。

有效使用线程池的准则

只要您遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:

  • 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都很忙。
  • 在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得 某些进展。
  • 理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队列可能会有意义,这样可以相应地调整每个池。

参考资料

Java 理论与实践: 线程池与工作队列