利用Java的synchronized关键字,wait和notify方法,实现简易阻塞型消息队列


前言

wait和notify这一块在刚刚学Java的时候学过,但初次接触到现在也有一两年了,过于久远,而且平时开发里也几乎没怎么用到wait和notify,缺少实践,近期在研究高并发的系统设计,专门研究了下Java的锁和代码同步机制,顺便记录下这个学习过程。

开始

重新认识synchronized

一开始学习Java时接触到的synchronized用法是这样的

class 类名 {
    public synchronized void 方法名() {
        // 同步代码块
    }
}

这样就能保证了方法只能同时被1个线程调用,一开始以为只能这样用,后面才了解到其实还有这种写法(我太菜了)

class 类名 {
    public void 方法名() {
        // 表示获取当前对象实例的锁
        synchronized(this) {
            // 同步代码块
        }
    }
}

在方法体代码块定义中,synchronized括号后面可以跟上任意Java对象,表示获取该对象的锁,若使用synchronized修饰静态方法,则相对于获取该类(class)的锁,效果和如下代码一致

class Xx {
    public static void 方法名() {
        synchronized(Xx.class) {
            // 同步代码块
        }
    }
}

当一个线程准备进入synchronized代码块时,若无法获得到锁,则方法执行会发生阻塞,直到锁被其他线程释放,当前线程才能拿到锁,进入同步代码块,解除锁导致的运行阻塞。

以上就是Javasynchronized关键字的简单使用方法,可以说是比较方便的API了,不需要自己手动上锁解锁,至于这个关键字的具体实现原理和不同竞争条件下的锁升级之类的特性,有空再专门研究和写一篇文章。

wait()和sleep()

前面synchronized关键字的用法讲完了,该轮到wait和sleep了。可能会有点好奇,为什么wait和sleep要一块讲?因为我一开始也搞不懂他们的区别,只知道wait()是Object类的实例方法,sleep()是Thread类的实例方法。作用都是使让代码停下来。

但后来研究了下,两者使用起来还是有一定的差别的。

首先,wait必须要在获得了当前对象的对象锁代码块中执行(可以理解为调用哪个对象的wait方法,调用的时候就要处于哪个对象的同步代码块中),而sleep则没有这个限制。

public synchronized void methodName() {
    this.wait(); //可以 没问题
}

public void methodName() {
    this.wait(); //不可以,没有处于this的同步代码块中,抛出IllegalMonitorStateException
}

public void methodName2() {
    synchronized(其他对象) {
        this.wait() // 不可以,这里只获得了其他对象的锁,但是却调用了this的wait,抛出IllegalMonitorStateException
    }
}

public void methodName3() {
    synchronized(其他对象) {
        其他对象.wait() // 可以,没问题,不一定非得是this
    }
}

也许你注意到了,wait()必须配合synchronized使用?为什么呢?那就是wait()有个很有意思的特性,执行wait()后的睡眠期间,会释放掉通过synchronized获得的锁,直到被唤醒的时候会重新获得锁,而sleep()会在线程睡眠期间依然持有这个锁,也就是说,在同步代码块中用wait()睡眠后,其他线程也能够获得对象的锁。

wait()的睡眠状态被notity()或notifyAll()唤醒时,不会抛出异常,sleep()导致的睡眠只能被interrupt()方法唤醒,同时sleep()会抛出异常,若interrupt()唤醒的是由wait()导致的睡眠,wait()同样会抛出异常。

同时,由于wait()会释放掉锁,因此一个对象中可能会同时存在多个线程由于wait()导致的睡眠

notify()与notifyAll()

notify()会将对象的一个线程从wait()导致的睡眠状态中唤醒,至于是哪个线程,则是随机的。

而notifyAll()则是唤醒对象所有由于wait()导致的睡眠。由于wait()被唤醒后会对对象加锁,如果没能拿到锁会暂时先阻塞着直到有机会拿到锁,因此依然是线程安全的。

简易消息队列的实现

现在就利用上面的特性,实现一个简单的阻塞型消息队列吧。

消息队列是一个典型的消费者-生产者模型,当消费者消费时,若消息队列为空,则一直等待(阻塞执行),直到生产者生产了一个消息,此时消费者阻塞消除并获得一个消息

这里阻塞的关键是队列空时等待,非空时唤醒

消息队列类完整代码如下:

/**
 * 使用锁实现的消息队列
 */
class MessageQueue {
    // 队列关闭标志
    private volatile boolean stop = false;
    // 使用链表作为消息容器
    private final LinkedList<String> queue = new LinkedList<>();
    /**
     * 生产消息
     * @param msg   消息内容
     */
    public void product(String msg) {
        // 获取链表的锁之后,向链表添加消息,最后尝试唤醒链表对象(如果在等待的话)
        synchronized (this) {
            if (stop) {
                throw new IllegalStateException("消息队列已关闭");
            }
            queue.add(msg);
            this.notify();
        }
    }
    /**
     * 关闭消息队列,并唤醒消费者的等待(如果在等待的话)
     */
    public void stop() {
        synchronized (this) {
            stop = true;
            this.notify();
        }
    }
    /**
     * 消费消息,若无消息将会一直阻塞,直到有消息或队列关闭,若消息队列已关闭则不再阻塞且返回null
     * @return 拿到的消息
     */
    public String consume() {
        // 获取对象的锁
        synchronized (this) {
            if (queue.size() != 0) {
                return queue.remove();
            } else {
                while (!stop && queue.size() == 0) {
                    try {
                        this.wait();
                    } catch (InterruptedException ignored) { }
                }

                // 由于关闭队列也有可能退出上面的循环,所以要再判断一次长度
                if (queue.size() == 0) {
                    return null;
                } else {
                    return queue.remove();
                }
            }
        }
    }
}

测试代码如下

public class Main {
    public static void main(String[] args) throws InterruptedException {
        MessageQueue mq = new MessageQueue();

        // 开1个线程消费消息
        new Thread(() -> {
           while (true) {
               String msg = mq.consume();
               System.out.println("消费消息:" + msg);
               if (msg == null) {
                   System.out.println("队列关闭");
                   break;
               }
           }
        }).start();
        for (int i = 0; i < 10; i++) {
            mq.product("" + i);
            System.out.println("生产消息:" + i);
            Thread.sleep(100);
        }
        mq.stop();
        Thread.sleep(1000);
        System.out.println("主线程结束");
    }
}

运行可以观察到每0.1秒生产一个消息,紧接着就被消费,最后程序退出。

性能

消费方法多次判断size(),代码同步简单粗暴。性能应该不咋地,后面有空再研究高性能并发队列的实现(当然jdk类库里也有相关实现)。