场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ScoredItems {
public Object items;
public double score;
public ScoredItems(Object items, double score) {
this.items = items;
this.score = score;
}
}

public interface Scorer {
List<ScoredItems> score(List<Object> items);
}

public class Ranker {
private Scorer scorer;

public List<ScoredItems> rank(List<Object> items) {
// TODO:
}
}

问题

请实现Ranker.rank接口,并以全局score降序排列返回List<ScoredItems> 其中items的长度最大是10000, Scorer.score接口items的长度最大是100.

实现一

使用线程池处理批量请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 public List<ScoredItems> rank(List<Object> items) {
// 100 分组
List<List<Object>> partitions = Lists.partition(items, 100);

if (partitions.size() == 1) {
return scorer.score(items);
}

List<Future<List<ScoredItems>>> futures = new ArrayList<>();

for(List<Object> partition: partitions) {
Future<List<ScoredItems>> future = threadPoolExecutor.submit(() -> scorer.score(partition));
futures.add(future);
}

List<ScoredItems> results = new ArrayList<>();

for (Future<List<ScoredItems>> f: futures) {
List<ScoredItems> result = f.get();
results.addAll(result);
}

results.sort((o1, o2) -> Double.compare(o2.score, o1.score));

return results;
}

其中线程池的初始化

1
threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, workQueue);

说明

  • 基于分组,然后利用线城池批量请求, 达到并发的的目的
  • 缺点: 1. 没有异常处理, 2. 线程池的设置合理性

实现二

加入异常处理并对超时异常特殊处理, 异常处理的准则是只要有一个score请求发生异常了,这个rank接口需要抛异常

1
2
3
4
5
6
7
8
9
10
for (Future<List<ScoredItems>> f: futures) {
try {
List<ScoredItems> result = f.get(timeout, TimeUnit.MILLISECONDS);
results.addAll(result);
} catch (TimeoutException e) {
throw new RPCTimeoutException("超时异常");
} catch (ExecutionException | InterruptedException e) {
throw new SystemException("其他异常");
}
}

说明

  1. 这里在循环里用了future.get(timeout, TimeUnit.MILLISECONDS)造成了整体超时时间会累积,我们期望的是接口每一次调用的延迟是固定的. 这种写法是有问题的.
  2. 这里只处理了发生异常的请求, rank接口层面已经完成了异常处理,但是其他请求仍然在线程池里执行
  3. 通过future.cancel 会取消其他线程池里的执行线程吗 (只适用于非超时异常的场景, 如果发生了超时异常再去取消,那么其他future大概率要么完成,要么已经异常)

实现三

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public List<ScoredItems> rank(List<Object> items) throws RPCTimeoutException {

List<List<Object>> partitions = Lists.partition(items, 100);
if (partitions.size() == 1) {
return scorer.score(items);
}

List<CompletableFuture<List<ScoredItems>>> allScored = partitions
.stream()
.map(partition ->
CompletableFuture.supplyAsync(() -> scorer.score(partition), executor)
.orTimeout(timeout, TimeUnit.MILLISECONDS))
.toList();

List<ScoredItems> result = new ArrayList<>();
for(CompletableFuture<List<ScoredItems>> pf: allScored) {
try {
List<ScoredItems> scored = pf.join();
result.addAll(scored);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.warn("[rank] found TimeoutException");
throw new RPCTimeoutException(e.getMessage());
} else {
// pass
}
} catch (CancellationException e) {
// pass
}
}
result.sort((o1, o2) -> Double.compare(o2.score, o1.score));
return result;
}

说明

  • 使用了CompletableFuture.orTimeout 处理每个请求的异常,获取结果的时候用了cf.join() 方法.
  • 使用CompletableFuture总是更好的选择
  • CompletableFuture.cancel(true)的实际效果是什么, 会节省线程池资源吗

CompletableFuture.cancel(true)的实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void groupCompletableFutureDemo() {
List<CompletableFuture<Integer>> futures = Stream.of(1, 2)
.map(o -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
logger.info("[GroupCompletableFuture] finish execution got result {}", o * 2);
return o * 2;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}, gExecutorService).orTimeout(o * 900, TimeUnit.MILLISECONDS))
.toList();

int timeoutExceptions = 0;
int cancelExceptions = 0;

for(CompletableFuture<Integer> future: futures) {
try {
Integer result = future.join();
logger.info("[GroupCompletableFuture] got result: {}", result);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
timeoutExceptions += 1;
logger.warn("[GroupCompletableFuture] got TimeoutException correctly done: {}", future.isDone());
for (CompletableFuture<Integer> f: futures) {
if (!f.isDone() && !f.isCancelled()) {
f.cancel(true);
}
}
}
} catch (CancellationException e) {
cancelExceptions += 1;
logger.warn("[GroupCompletableFuture] got cancel exception");
}
}

logger.info("[GroupCompletableFuture] timeout exceptions: {}, cancel exceptions: {}",
timeoutExceptions, cancelExceptions);
}
//
2024-07-26 11:48:48 WARN ConcurrentScoreDemo:434 - [GroupCompletableFuture] got TimeoutException correctly done: true
2024-07-26 11:48:48 WARN ConcurrentScoreDemo:443 - [GroupCompletableFuture] got cancel exception
2024-07-26 11:48:48 INFO ConcurrentScoreDemo:447 - [GroupCompletableFuture] timeout exceptions: 1, cancel exceptions: 1
2024-07-26 11:48:48 INFO ConcurrentScoreDemo:415 - [GroupCompletableFuture] finish execution got result 4
2024-07-26 11:48:48 INFO ConcurrentScoreDemo:415 - [GroupCompletableFuture] finish execution got result 2

说明

  • CompletableFuture.cancel(true) 并不能取消线程的执行,只会更改future本身的状态

使用Thread.interrupt() 改变Thread运行状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public static void longRunningTask(ScheduledExecutorService scheduledExecutorService, long timeout)
throws InterruptedException {

Thread thread = Thread.currentThread();

AtomicBoolean done = new AtomicBoolean(false);

scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
if (!done.get()) {
done.set(true);
thread.interrupt();
}
}
}, timeout, TimeUnit.MILLISECONDS);

// 模拟耗时工作
Thread.sleep(1000);
}

public static void realInterruptTaskDemo() throws InterruptedException {

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 60, TimeUnit.MILLISECONDS, workingQueue);

long start = System.currentTimeMillis();

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
longRunningTask(scheduledExecutorService, 800);
logger.info("[realInterruptTaskDemo.longRunningTask] finished");
return 1;
} catch (InterruptedException e) {
logger.info("[realInterruptTaskDemo.longRunningTask] canceled: {}", e.getMessage());
}
return null;
}, executor);

logger.warn("[realInterruptTaskDemo] active thread count: {}", executor.getActiveCount());

try {
future.join();
} catch (CompletionException e) {
logger.warn("[realInterruptTaskDemo] CompletionException: {}", e.getMessage());
} catch (CancellationException e) {
logger.warn("[realInterruptTaskDemo] CancellationException: {}", e.getMessage());
}

logger.warn("[realInterruptTaskDemo] task is still running after join active thread count: {}, consumes: {}", executor.getActiveCount(),
System.currentTimeMillis() - start);

if (future.isCompletedExceptionally()) {
logger.warn("[realInterruptTaskDemo] future is error");
}

scheduledExecutorService.shutdown();
executor.shutdown();
}

// output
2024-07-26 11:59:50 WARN ConcurrentScoreDemo:281 - [realInterruptTaskDemo] active thread count: 1
2024-07-26 11:59:51 INFO ConcurrentScoreDemo:276 - [realInterruptTaskDemo.longRunningTask] canceled: sleep interrupted
2024-07-26 11:59:51 WARN ConcurrentScoreDemo:291 - [realInterruptTaskDemo] task is still running after join active thread count: 0, consumes: 803

说明

  • 使用了Thread.interrupt之后,本来1000ms的sleep 在800ms的时候被打断了,同时线程也没了.
  • 这样做的后果是什么, 虽然activeCount的线程没了,但是意味着线程池需要重新创建一个新的线程(有必要的情况下),创建线程同样也需要消耗资源, 实际的生产环境中,一般都是有超时处理的,这种情况直接让请求在线程池中走完,而不是销毁线程,这样更加合理一些.

总结

  • 在处理批量的请求中,异常处理使用CompleableFuture更加合理和简洁,接口的超时使用orTimeout加入超时机制,避免在循环中手动调用future.get(timeout)这种实践