使用Redis结合aop提取用户的行为数据

场景描述

在项目中有这样一个需求,用户下载app并打开进入首页之前,会让用户选择自己感兴趣的主题分类,后台根据用户的选择提取出用户的行为数据并作出统计,以图形或者表格的形式展现出来,后期就可以根据这些数据做去做一些类似个性化或精准化的推送了,例如在进入豆瓣app首页前提示用户先选择感兴趣的主题内容.
image.png

具体实现

由于项目中主要使用了Spring框架,所以首先想到的最简单的方式就是使用aop切面拦截用户选择主题分类的接口,然后在数据库中建一张单独的表t_user_interest来存储用户的行为数据,由于是统计型数据所以实时性要求不高,允许有一定的延迟,并且为避免频繁的操作数据库,在切面拦截数据后起独立的线程推送到redis队列,后台再起调度任务每隔1小时异步持久化到数据库中.下面来看看实现步骤:
1.定义aop拦截器拦截方法调用,把用户行为数据推送到redis队列中,其中TaskPo为封装好的与用户相关的行为数据对象.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* aop拦截器拦截方法调用
*/
@Component
@Aspect
public class UserInterestAspect {

private final static Logger logger = LoggerFactory.getLogger(UserInterestAspect.class);

private final LinkedBlockingQueue<TaskPo> queue = new LinkedBlockingQueue<>();

private final static ExecutorService pool = Executors.newFixedThreadPool(5);

private final static String QUEUE_NAME = "USER_INTEREST";

/**
* 前置通知:在目标方法开始之前执行
*
* @param joinPoint
* @throws Exception
*/
@Before("execution(* xxx.service.impl.xxxServiceImpl.xxx(..))")
public void saveRequiresLog(JoinPoint joinPoint) throws Exception {
try {
TaskPo taskPo = getTaskPo(joinPoint);
if (taskPo != null) {
queue.add(taskPo);
TaskPo taskMsge;
while ((taskMsge = queue.poll()) != null) {
pool.execute(new PushRedisWorker(QUEUE_NAME, taskMsge));
}
}
} catch (Exception e) {
throw e;
}
}

/**
* 解析参数映射为Java对象
*
* @param joinPoint
* @return
*/
public TaskPo getTaskPo(JoinPoint joinPoint) {
Object[] paramsValue = joinPoint.getArgs();
String[] paramsName = ((CodeSignature) joinPoint.getStaticPart()
.getSignature()).getParameterNames();
JSONObject entityJSON = new JSONObject();
for (int i = 0; i < paramsName.length; i++) {
entityJSON.put(paramsName[i], paramsValue[i]);
}
if (entityJSON != null && !entityJSON.isEmpty()) {
TaskPo taskPo = JSONObject.toJavaObject(entityJSON, TaskPo.class);
taskPo.setOperatorDate(new Date());
return taskPo;
}
return null;
}
}

2.接着起一个任务线程接口worker继承Runnable.

1
2
3
public interface Worker extends Runnable {

}

3.然后创建一个PushRedisWorker类实现Worker接口,并将封装好的数据写到redis队列中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PushRedisWorker implements Worker {

private String QUEUE_NAME;

private TaskPo taskPo;

public PushRedisWorker(String QUEUE_NAME, TaskPo taskPo) {
this.QUEUE_NAME = QUEUE_NAME;
this.taskPo = taskPo;
}

@Override
public void run() {
// TODO Auto-generated method stub
try (Jedis jedis = JedisUtils.getJedis()) {
jedis.lpush(QUEUE_NAME, JSON.toJSONString(taskPo));
}
}
}

4.然后再定义一个调度任务器类每隔一段时间间隔轮询redis任务队列,不断的从队列中消费数据,再结合线程池异步发送数据到mysql中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 任务调度定时到Redis队列中拉取对象持久化到数据库
*/
@Component
public class RedisTaskJob {

private final static Logger logger = LoggerFactory.getLogger(RedisTaskJob.class);

private final static String QUEUE_NAME = "USER_INTEREST";

private final static ExecutorService es = Executors.newFixedThreadPool(5);

private volatile static boolean isRun = true;

/**
* 每隔一小时执行一次
*/
@Scheduled(cron = "0 0 0/1 * * ? ")
public void getRedisTask() {
if (logger.isDebugEnabled()) {
logger.debug("调度开始");
}
//取出spring受管的业务service实例
IUserInterestService userInterestService = SpringContextHolder.getBean("userInterestService",IUserInterestService.class);
try (Jedis jedis = JedisUtils.getJedis()) {
if (jedis.exists(QUEUE_NAME)) {
start();
}
while (isRun) {
if (!jedis.exists(QUEUE_NAME)) {
stop();
break;
}
try {
String task = jedis.lpop(QUEUE_NAME);
if (StringHelpUtils.isNotBlank(task)) {
TaskPo taskPo = JSONObject.toJavaObject(JSON.parseObject(task), TaskPo.class);
//任务对象转化为实体model
UserInterest model = EntityUtils.convert(taskPo,UserInterest.class);
es.submit(() -> {
//执行数据入库操作
userInterestService.save(model);
});
}
} catch (Exception e) {
stop();
throw e;
}
}
}
}

public static void stop() {
isRun = false;
}

public static void start() {
isRun = true;
}
}

5.最后统计mysql中的数据并生成报表.