高并发下的缓存设计

背景

工业界有很多成熟的系统,大多数都是分布式的缓存系统,如redis,memcache,Ehcache等,它们被广泛运用于互联网公司中。缓存系统的一个重要应用就是减小数据库的压力,也可以缓存jsp,静态HTML,图片等资源。

这次说的缓存,并不是大型的分布式缓存系统,也没有类似LRU等过期机制,它仅仅是一个十分简单的HashMap对象。但是,它主要是用来解决高并发下,对象仅仅被创建一次,并且创建对象的过程是个耗时操作。在高并发的情况下,多个线程可能需要同样一个数据,如果它们同时发现这个数据在内存中没有,那么,它们就都会去经过计算,如果这个计算是一个很耗时,那么就会很大程度的浪费系统的CPU资源。当然,concurrentHashMap可以最终保证数据的唯一性,但是却没办法解决计算的唯一性。

因此,对于一个进程内,高并发下执行耗时操作创建同一个对象,对象仅被创建一次。我们使用FutureTask+concurrentHashMap来实现目标功能

代码示例

计算接口

1
2
3
4
5
package cache;
public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}

接收一个A类型的参数,经过计算,输出V类型的数据。

计算任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package helloworld;
import cache.Computable;
@SuppressWarnings("hiding")
public class Task<String,Ojbect> implements Computable<String,Ojbect>{
@Override
public Ojbect compute(String arg) throws InterruptedException {
System.out.println("start computer");
int a = 1;
for(int i = 0;i < 2000;i++){
for(int j = 0;j < 200;j++){
for(int k = 0;k < 200;k++){
for(int h = 0;h < 2;h++){
a = -a;
}
}
}
}
return (Ojbect) Integer.valueOf(a);
}
}

实现了Computable接口,实现了计算过程,模拟一个相对耗时的操作

缓存Map(CacheMap)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cache;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CacheMap<A,V>{
private final Map<A,FutureTask<V>> container = new ConcurrentHashMap<A,FutureTask<V>>();
public V compute(A key, Computable<A,V> task) throws InterruptedException {
FutureTask<V> ret = container.get(key);
if(ret == null){
Callable<V> taskWarpper = new Callable<V>(){
public V call() throws InterruptedException {
return task.compute(key);
}
};
/*ret = new FutureTask<V>(taskWarpper);
container.put(key, ret);
ret.run();*/
FutureTask<V> ft = new FutureTask<V>(taskWarpper);
ret = container.putIfAbsent(key, ft);
if(ret == null){
System.out.println(Thread.currentThread().getName() + ": " + key + " start calculate");
ret = ft;
ret.run();
Thread.sleep(1000);
ret.run();
}
else{
System.out.println(Thread.currentThread().getName() + ": " + key + " is already in the container, please waiting....");
}
}
try{
return ret.get();
} catch(CancellationException e){
container.remove(key);
} catch (ExecutionException e) {
throw (InterruptedException)e.getCause();
}
return null;
}
public V start(Computable<A,V> task){
return null;
}
}

CacheMap是缓存数据的结构体,目标是以A类型为key,V类型为value的HashMap。里面封装了一个concurrentHashMap,以A类型为key,以FutureTask<V>为value。

当在Map里没找到时,表示结果对象还没创建,也可能是另一个线程正在创建。新建一个Callable的任务,并用FutureTask包起来

1
2
3
4
5
6
7
8
9
Callable<V> taskWarpper = new Callable<V>(){
public V call() throws InterruptedException {
return task.compute(key);
}
};
/*ret = new FutureTask<V>(taskWarpper);
container.put(key, ret);
ret.run();*/
FutureTask<V> ft = new FutureTask<V>(taskWarpper);

尝试将FutureTask<V>放到Map中,如果成功,返回null。由于concurrentHashMap是线程安全的,只有一个能够放进去,其它则会阻塞等待。ret.run()则会创建新的线程去执行计算,其它的会到ret.get()阻塞等待结果。