0%

将异步接口转成同步接口

将短信发送的异步回调接口改为同步接口,核心思路是:发起短信请求后,在后端服务内阻塞等待回调通知,直到超时或收到明确结果。以下是具体实现方案、代码示例和注意事项。

一、核心原理

  1. 异步转同步的核心:用「阻塞等待 + 回调通知」替代直接返回。发起短信后,主线程阻塞,直到收到回调(成功 / 失败)或超时。
  2. 关键依赖:需要一个「线程安全的临时存储」(如 Redis、内存缓存)和「阻塞工具」(如 CountDownLatchFutureTask)。
  3. 流程拆解
    • 生成唯一请求 ID(如 UUID),作为短信发送的唯一标识。
    • 发起短信请求时,将请求 ID 与阻塞工具关联,存入临时存储。
    • 主线程阻塞等待结果。
    • 短信平台回调时,根据请求 ID 找到对应的阻塞工具,唤醒主线程并返回结果。

二、方案选择(按推荐优先级排序)

方案 1:Redis + CountDownLatch(分布式场景首选)

  • 适用场景:微服务、多实例部署(需跨进程共享状态)。
  • 优势:分布式兼容、高可用、支持超时控制。
  • 依赖:Redis(存储请求状态)、Spring Boot(示例用,其他框架同理)。

方案 2:内存缓存 + FutureTask(单实例场景)

  • 适用场景:单服务实例、低并发(无需跨进程)。
  • 优势:无外部依赖、性能高。
  • 劣势:服务重启后状态丢失,不支持分布式。

三、方案 1:Redis + CountDownLatch(分布式场景)

1. 架构设计

组件 作用
业务服务 发起短信请求、阻塞等待、接收回调
Redis 存储请求状态(等待 / 成功 / 失败)、阻塞标识
短信平台 发送短信、触发回调
CountDownLatch 主线程阻塞工具(计数为 1,回调后减 1)

2. 核心步骤

步骤 1:生成唯一请求 ID

用 UUID 作为短信请求的唯一标识,贯穿整个流程。

步骤 2:发起短信请求并阻塞
  • 调用短信平台 API,传入请求 ID(作为回调参数)。
  • 用 CountDownLatch 阻塞主线程,等待回调通知。
  • 同时设置 Redis 键值对,记录请求状态(如 sms:status:{requestId} = “WAITING”)。
步骤 3:处理短信平台回调
  • 短信平台触发回调接口,携带请求 ID 和送达状态(成功 / 失败)。
  • 回调接口更新 Redis 状态(如 sms:status:{requestId} = “SUCCESS”)。
  • 调用 CountDownLatch.countDown(),唤醒阻塞的主线程。
步骤 4:主线程获取结果
  • 主线程被唤醒后,从 Redis 读取最终状态,返回给前端。
  • 超时未收到回调时,返回 “短信发送中,请稍后查询” 或 “发送超时”。

3. 代码实现(Spring Boot 示例)

3.1 依赖配置(pom.xml)
1
2
3
4
5
6
7
8
9
10
11
<!-- Redis 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 短信 SDK(以阿里云为例) -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dysmsapi20170525</artifactId>
<version>2.0.24</version>
</dependency>
3.2 Redis 配置(application.yml)
1
2
3
4
5
6
spring:
redis:
host: 127.0.0.1
port: 6379
password: your-redis-password
timeout: 3000
3.3 核心服务类(SmsService)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import com.aliyun.dysmsapi20170525.Client;
import com.aliyun.dysmsapi20170525.models.SendSmsRequest;
import com.aliyun.teaopenapi.models.Config;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
public class SmsService {
// Redis 过期时间(需大于短信平台回调超时时间,如 60 秒)
private static final long REDIS_EXPIRE_SECONDS = 60;
// 阻塞超时时间(如 30 秒,避免无限阻塞)
private static final long BLOCK_TIMEOUT_SECONDS = 30;

@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private Client smsClient; // 阿里云短信客户端(初始化见下文)

/** * 同步发送短信(阻塞等待结果) * @param phone 手机号 * @param templateCode 模板编码 * @param templateParam 模板参数(JSON 格式) * @return 发送结果(SUCCESS/FAIL/TIMEOUT) */
public String sendSmsSync(String phone, String templateCode, String templateParam) {
// 1. 生成唯一请求 ID
String requestId = UUID.randomUUID().toString().replace("-", "");
String statusKey = "sms:status:" + requestId;
String latchKey = "sms:latch:" + requestId;

try {
// 2. 初始化 CountDownLatch(计数 1)
CountDownLatch latch = new CountDownLatch(1);
// 3. 存入 Redis(状态:等待,阻塞工具)
redisTemplate.opsForValue().set(statusKey, "WAITING", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
redisTemplate.opsForValue().set(latchKey, latch.toString(), REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS); // 实际项目中需序列化存储

// 4. 发起短信请求(传入 requestId 作为回调参数)
SendSmsRequest smsRequest = new SendSmsRequest()
.setPhoneNumbers(phone)
.setTemplateCode(templateCode)
.setTemplateParam(templateParam)
.setOutId(requestId); // 短信平台回调时会返回该参数
smsClient.sendSms(smsRequest);

// 5. 阻塞等待回调(超时时间 30 秒)
boolean isSuccess = latch.await(BLOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!isSuccess) {
// 超时未收到回调
redisTemplate.opsForValue().set(statusKey, "TIMEOUT", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
return "TIMEOUT";
}

// 6. 从 Redis 获取最终状态
String finalStatus = redisTemplate.opsForValue().get(statusKey);
return finalStatus != null ? finalStatus : "UNKNOWN";

} catch (Exception e) {
// 异常处理(如短信平台调用失败)
redisTemplate.opsForValue().set(statusKey, "FAIL", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
return "FAIL";
} finally {
// 7. 清理 Redis 键(可选,避免占用空间)
redisTemplate.delete(statusKey);
redisTemplate.delete(latchKey);
}
}

/** * 短信平台回调接口(需暴露给外网) * @param requestId 唯一请求 ID * @param status 送达状态(SUCCESS/FAIL) */
public void handleSmsCallback(String requestId, String status) {
String statusKey = "sms:status:" + requestId;
String latchKey = "sms:latch:" + requestId;

try {
// 1. 更新 Redis 状态
redisTemplate.opsForValue().set(statusKey, status, REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
// 2. 获取 CountDownLatch 并唤醒主线程
CountDownLatch latch = (CountDownLatch) redisTemplate.opsForValue().get(latchKey); // 实际项目中需反序列化
if (latch != null) {
latch.countDown();
}
} catch (Exception e) {
// 回调异常处理(如日志记录)
}
}

/** * 初始化阿里云短信客户端(示例) */
public static Client createSmsClient() throws Exception {
Config config = new Config()
.setAccessKeyId("your-access-key-id")
.setAccessKeySecret("your-access-key-secret");
config.endpoint = "dysmsapi.aliyuncs.com";
return new Client(config);
}
}
3.4 控制器(暴露接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/sms")
public class SmsController {

@Autowired
private SmsService smsService;

/** * 同步发送短信接口(供前端调用) */
@PostMapping("/send")
public String sendSms(@RequestParam String phone,
@RequestParam String templateCode,
@RequestParam String templateParam) {
return smsService.sendSmsSync(phone, templateCode, templateParam);
}

/** * 短信平台回调接口(需配置到短信平台) */
@GetMapping("/callback")
public String handleCallback(@RequestParam String outId, // 对应发送时的 outId(requestId)
@RequestParam String status) { // 短信平台返回的状态(如 DELIVERED/SUCCESS)
// 转换状态(根据短信平台文档映射)
String finalStatus = "DELIVERED".equals(status) ? "SUCCESS" : "FAIL";
smsService.handleSmsCallback(outId, finalStatus);
return "success";
}
}

四、方案 2:内存缓存 + FutureTask(单实例场景)

1. 核心思路

用 ConcurrentHashMap 存储 FutureTask(线程安全的异步任务结果),发起短信后阻塞等待 FutureTask 的结果,回调时设置结果并唤醒线程。

2. 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;

@Service
public class SmsService {
// 线程安全的内存缓存(存储 requestId -> FutureTask)
private final Map<String, FutureTask<String>> taskCache = new ConcurrentHashMap<>();
// 线程池(处理短信发送异步任务,可选)
private final ExecutorService executor = Executors.newFixedThreadPool(10);

/** * 同步发送短信 */
public String sendSmsSync(String phone, String templateCode, String templateParam) throws ExecutionException, InterruptedException {
String requestId = UUID.randomUUID().toString();
// 创建 FutureTask(用于存储结果和阻塞)
FutureTask<String> futureTask = new FutureTask<>(() -> {
// 这里是耗时操作(发起短信请求),实际可放在线程池执行
sendSmsAsync(phone, templateCode, templateParam, requestId);
// 阻塞等待回调设置结果(无超时,需在回调中唤醒)
return null;
});

// 存入缓存
taskCache.put(requestId, futureTask);
// 提交任务(或直接执行)
executor.submit(futureTask);

// 阻塞等待结果(超时时间 30 秒)
try {
return futureTask.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
taskCache.remove(requestId);
return "TIMEOUT";
}
}

/** * 异步发起短信请求(实际调用短信平台) */
private void sendSmsAsync(String phone, String templateCode, String templateParam, String requestId) {
try {
// 调用短信平台 API(传入 requestId 作为回调参数)
// 示例:smsClient.sendSms(...)
} catch (Exception e) {
// 异常时设置失败结果
FutureTask<String> futureTask = taskCache.get(requestId);
if (futureTask != null && !futureTask.isDone()) {
futureTask.setException(e);
}
taskCache.remove(requestId);
}
}

/** * 回调接口处理 */
public void handleSmsCallback(String requestId, String status) {
FutureTask<String> futureTask = taskCache.get(requestId);
if (futureTask != null && !futureTask.isDone()) {
// 设置结果(唤醒阻塞的主线程)
futureTask.set(status);
}
// 清理缓存
taskCache.remove(requestId);
}
}

五、关键注意事项

  1. 超时控制:必须设置阻塞超时(如 30 秒),避免主线程无限阻塞导致服务雪崩。
  2. 幂等性处理:短信平台可能重复回调(如网络重试),需确保回调接口幂等(多次调用不影响结果)。
  3. 分布式场景:单实例方案不支持多服务部署(缓存无法共享),必须用 Redis 等分布式存储。
  4. 异常处理
    • 短信平台调用失败(如参数错误、余额不足):直接返回失败,无需阻塞。
    • 回调接口异常(如 Redis 宕机):记录日志,后续通过定时任务查询短信状态(补偿机制)。
  5. 性能优化
    • 高并发场景下,阻塞线程会占用资源,建议控制并发数(如用 Semaphore 限流)。
    • Redis 操作尽量用 pipeline 或批量命令,减少网络开销。
  6. 状态补偿:对于超时的请求,可通过定时任务查询短信平台的最终状态(如调用短信平台的 “查询短信状态” API),避免结果丢失。

六、总结

  • 分布式场景:优先选择「Redis + CountDownLatch」,支持高可用和跨进程共享。
  • 单实例场景:用「内存缓存 + FutureTask」,简单高效但不支持分布式。
  • 核心保障:超时控制、幂等性、异常补偿是确保系统稳定的关键。