java

位置:IT落伍者 >> java >> 浏览文章

Java多线程同步设计中使用Metux


发布日期:2024年05月16日
 
Java多线程同步设计中使用Metux

Mutex是互斥体广泛地应用在多线程编程中本文以广为流程的Doug Lea的concurrent工具包的Mutex实现为例进行一点探讨在Doug Lea的concurrent工具包中Mutex实现了Sync接口该接口是concurrent工具包中所有锁(lock)门(gate)和条件变量(condition)的公共接口Sync的实现类主要有MutexSemaphore及其子类LatchCountDownReentrantLock等这也体现了面向抽象编程的思想使我们可以在不改变代码或者改变少量代码的情况下选择使用Sync的不同实现下面是Sync接口的定义

public interface Sync

{

public void acquire() throws InterruptedException;

//获取许可

public boolean attempt(long msecs) throws InterruptedException;

//尝试获取许可

public void release();

//释放许可

}

通过使用Sync可以替代Java synchronized关键字并提供更加灵活的同步控制当然并不是说concurrent工具包是和Java synchronized独立的技术其实concurrent工具包也是在synchronized的基础上搭建的从下面对Mutex源码的解析即可以看到这一点synchronized关键字仅在方法内或者代码块内有效而使用Sync却可以跨越方法甚至通过在对象之间传递跨越对象进行同步这是Sync及concurrent工具包比直接使用synchronized更加强大的地方

注意Sync中的acquire()和attempt()都会抛出InterruptedException所以使用Sync及其子类时调用这些方法一定要捕获InterruptedException而release()方法并不会抛出InterruptedException这是因为在acquire()和attempt()方法中可能会调用wait()等待其它线程释放锁而release()在实现上进行了简化直接释放锁不管是否真的持有所以你可以对一个并没有acquire()的线程调用release()这也不会有什么问题而由于release()不会抛出InterruptedException所以我们可以在catch或finally子句中调用release()以保证获得的锁能够被正确释放比如

class X

{

Sync gate; //

public void m()

{

try

{

gateacquire();

// block until condition holds

try

{

// method body

}

finally { gaterelease(); }

}

catch (InterruptedException ex) { // evasive action }

}

}

Mutex是一个非重入的互斥锁Mutex广泛地用在需要跨越方法的before/after类型的同步环境中下面是Doug Lea的concurrent工具包中的Mutex的实现

public class Mutex implements Sync

{

/** The lock status **/

protected boolean inuse_ = false;

public void acquire() throws InterruptedException

{

if (Threadinterrupted()) throw new InterruptedException();//()

synchronized(this)

{

try

{

while (inuse_) wait();

inuse_ = true;

}

catch (InterruptedException ex)

{

//()

notify();

throw ex;

}

}

}

public synchronized void release()

{

inuse_ = false;

notify();

}

public boolean attempt(long msecs) throws InterruptedException

{

if (Threadinterrupted()) throw new InterruptedException();

synchronized(this)

{

if (!inuse_)

{

inuse_ = true;

return true;

}

else if (msecs <= )

return false;

else

{

long waitTime = msecs;

long start = SystemcurrentTimeMillis();

try

{

for (;;)

{

wait(waitTime);

if (!inuse_)

{

inuse_ = true;

return true;

}

else

{

waitTime = msecs (SystemcurrentTimeMillis() start);

if (waitTime <= ) // ()

return false;

}

}

}

catch (InterruptedException ex)

{

notify();

throw ex;

}

}

}

}

}

为什么要在acquire()和attempt(方法的开始都要检查当前线程的中断标志呢?这是为了在当前线程已经被打断时可以立即返回而不会仍然在锁标志上等待调用一个线程的interrupt()方法根据当前线程所处的状态可能产生两种不同的结果当线程在运行过程中被打断则设置当前线程的中断标志为true如果当前线程阻塞于wait()sleep()join()则当前线程的中断标志被清空同时抛出InterruptedException所以在上面代码的位置()也捕获了InterruptedException然后再次抛出InterruptedException

release()方法简单地重置inuse_标志并通知其它线程

attempt()方法是利用Java的Objectwait(long)进行计时的由于Objectwait(long)不是一个精确的时钟所以attempt(long)方法也是一个粗略的计时注意代码中位置(在超时时返回

Mutex是Sync的一个基本实现除了实现了Sync接口中的方法外并没有添加新的方法所以Mutex的使用和Sync的完全一样在concurrent包的API中Doug给出了一个精细锁定的List的实现示例我们这儿也给出作为对Mutex和Sync使用的一个例子

class Node

{

Object item; Node next;

Mutex lock = new Mutex();

// 每一个节点都持有一个锁

Node(Object x Node n)

{

item = x;

next = n;

}

}

class List

{

protected Node head;

// 指向列表的头

// 使用Java的synchronized保护head域

// (我们当然可以使用Mutex但是这儿似乎没有这样做的必要

protected synchronized Node getHead()

{ return head; }

boolean search(Object x) throws InterruptedException

{

Node p = getHead();

if (p == null) return false;

// (这儿可以更加紧凑但是为了演示的清楚各种情况都分别进行处理)

plockacquire();

// Prime loop by acquiring first lock

// (If the acquire fails due to

// interrupt the method will throw

// InterruptedException now

// so there is no need for any

// further cleanup)

for (;;)

{

if (xequals(em))

{

plockrelease();

// 释放当前节点的锁

return true;

}

else

{

Node nextp = pnext;

if (nextp == null)

{

plockrelease();

// 释放最后持有的锁

return false;

}

else

{

try

{

nextplockacquire();

// 在释放当前锁之前获取下一个节点的锁

}

catch (InterruptedException ex)

{

plockrelease();

// 如果获取失败也释放当前的锁 throw ex;

}

plockrelease();

// 释放上个节点的锁现在已经持有新的锁了

p = nextp;

}

}

}

}

synchronized void add(Object x)

{

// 使用synchronized保护head域

head = new Node(x head);

}

// other similar traversal and update methods

}

上一篇:JAVA代码编写的30条建议

下一篇:Java中多线程之间可以通过接口来实现信息共享