数据库理解
- dpo_import : 保存的是所有待分发数据
- distribute_rule:分发规则,每个TM(team)一个规则
- rule_info:对应distribute_rule的细分规则
- distribute_log:数据分发日志,记录每天数据分发情况
- distribute_count:分发统计
数据库ER图
distribute_rule:分发规则
test1规则
test2规则
需求理解
业务需求
1.每天零点对dpo_import数据,按照分发规则(distribute_rule)进行分发
2.记录分发日志(distribute_log)
3.统计下每天每个TM(每个TM每天一条统计数据),根据每个规则分发了多少数据(distribute_count)
简易分析流程
零:每天根据start_date生效时间 更新status状态 没到时间就是待生效,到时间就是生效中 过期就是某时间
一、定时任务:每天根据生效时间更新生效状态零点调用 查询(enable已启用 且 在生效时间status=1)的TM规则列表 调用分发函数(传一个规则) 修改当前status状态(1:生效中)
二、分发函数:按照分发规则分发,分发结束后,记录日志,统计今天的每个规则分发了的数据
三、按规则分发:
- 1.拿到该规则需要分发的人数
- 2.按照分发规则筛选符合条件且未分发(distribute_status = 0)的人数,取出前需要分发的人数(规则人数)
- 3.插入日志(distribute_log)(gift_id、tm_id、rule_id、rule_item_id、created_by、updated_by)
四:调用统计规则函数:当一个TM分发规则完成后,在给统计表插入(tm_id、rule_id、rule_items(rule_info.id)、rule_info.id、distribute_total(分发总数量)、distribute_detail(分发数据明细)、stamp(DC预分配TM当天时间)、创建人员、更新人员)
简易分析流程图
distrbute_count表
rule_items表示详细规则编号, detail表示具体分发了多少数据
distribute_log表
代码流程
定时任务-> 找到已启动且激活的规则列表 -> 根据规则id在 详细规则表中找到 动态的条件 -> 拿到条件筛选出要分发的名单 -> 记录人员分发日志 -> 统计TM规则分发数量日志 -> 打印分发运行时间
1.写一个定时任务
dpoImportDistributionTaskService里面包含了所有业务操作
@Slf4j
@Component
public class DistributeTasks {
@Resource
private DpoImportDistributionTaskService dpoImportDistributionTaskService;
// 定义定时任务方法,每天零点执行
@Scheduled(cron = "0 0 0 * * ?")
public void distributionTask() {
try {
long tm1 = System.currentTimeMillis();
dpoImportDistributionTaskService.scheduledDistribution();
long tm2 = System.currentTimeMillis();
log.debug(String.format("distributionTask 耗时-> 执行: %d", tm2 - tm1));
} catch (Exception ex) {
log.warn("distributionTask", ex);
}
}
}
2.两次遍历循环找到 规则列表 和 详细规则动态查询条件 并记录日志
类型转化的工具类参考动态规则查询返回Object类转化成DpoImport类, 逐步优化转为工具类
@Slf4j
@Service
public class DpoImportDistributionTaskServiceImpl implements DpoImportDistributionTaskService {
@Resource
private DistributeRuleService distributeRuleService;
@Resource
private DpoImportService dpoImportService;
@Resource
private RuleInfoService ruleInfoService;
@Resource
private DistributeLogService distributeLogService;
@Resource
private DistributionCountService distributionCountService;
@Override
public void scheduledDistribution() throws Exception {
// 得到TM规则
List<DistributeRule> tmRules = distributeRuleService.getEnabledTMRules();
// 遍历TM规则
for (DistributeRule distributeRule : tmRules) {
// 根据TM规则id获得 规则列表
List<RuleInfo> ruleInfoList = ruleInfoService.getRuleInfoByRuleId(distributeRule.getTmId());
// TM规则总分发数量
Integer total = distributeRule.getTotal();
// 记录每个规则分发的数量
List<Integer> distributionNumberList = new ArrayList<>();
for (RuleInfo rule : ruleInfoList) {
// 取出RuleItem规则
String ruleItemsJson = rule.getRuleItems();
// JSON转对象
ObjectMapper objectMapper = new ObjectMapper();
// 查询条件
RuleItem[] ruleItems = objectMapper.readValue(ruleItemsJson, RuleItem[].class);
// 得到该规则之一 id拿到的 分发人员列表
DynResult importListByRuleInfo = dpoImportService.getImportListByRuleInfo(ruleItems, rule);
List<DpoImport> dpoImports = FieldNameConverterHelper.convertToClass(importListByRuleInfo.getItems(), DpoImport.class);
// 只要分发数量没有超过TM总数量
if ((total -= dpoImports.size()) < 0) continue;
// 将分发规则存在列表
distributionNumberList.add(dpoImports.size());
// 进行分发
int update = dpoImportService.distributionDpoImport(dpoImports);
if (update >= 0) {
// 记录Log日志
distributeLogService.insertLog(dpoImports, distributeRule, rule);
}
}
// 记录count日志
distributionCountService.TmDistribution(distributeRule, ruleInfoList, distributionNumberList);
// 清空分发数量统计表
distributionNumberList.clear();
}
}
}
3.DistributeRuleService
@RemoteService
public interface DistributeRuleService {
/**
* 获取已启用且规则生效中的TM规则列表
* @return TM列表
*/
List<DistributeRule> getEnabledTMRules();
}
@Slf4j
@Service
public class DistributeRuleServiceImpl implements DistributeRuleService {
@Autowired
private Configuration configuration;
@Override
public List<DistributeRule> getEnabledTMRules() {
try {
DSLContext create = DSL.using(configuration);
return create.selectFrom(Tables.DISTRIBUTE_RULE)
.where(Tables.DISTRIBUTE_RULE.ENABLE.eq(1))
.and(Tables.DISTRIBUTE_RULE.STATUS.eq(1))
.fetchInto(DistributeRule.class);
} catch (Exception ex) {
log.warn("getEnabledTMRules", ex);
return null;
}
}
}
4.DpoImportService
@RemoteService
public interface DpoImportService {
/**
* 根据传来的筛选规则 查询dpo表
* @param ruleItems 详细规则
* @param rule 规则列
* @return dpo表
*/
DynResult getImportListByRuleInfo(RuleItem[] ruleItems, RuleInfo rule);
/**
* 分发操作并
* @param dpoImportList dpo客户表
*/
int distributionDpoImport(List<DpoImport> dpoImportList);
}
@Slf4j
@Service
public class DpoImportServiceImpl implements DpoImportService {
@Autowired
private Configuration configuration;
@Autowired
private DynamicEntityFilterService dynamicEntityFilterService;
@Override
public DynResult getImportListByRuleInfo(RuleItem[] ruleItems, RuleInfo rule) {
Gson gson = new Gson();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
List<DynQuery.Filter> filters = new ArrayList<>();
DynQuery dynQuery = new DynQuery();
for (int i = 0; i < ruleItems.length; i ++) {
String check = ruleItems[i].getCheck();
boolean checkable = ruleItems[i].isCheckable();
String filterValue;
if (checkable && check.equals("=")) {
check = "in";
List<String> value = Arrays.asList(gson.fromJson(ruleItems[i].getValue().toString(), String[].class));
filterValue = StringUtils.join(value, ",");
} else if (check.equals("excludeBetween")) {
StringBuilder stringBuilder = new StringBuilder();
for (String s : ruleItems[i].getValue().toString().split("~")) {
stringBuilder.append(sdf.format(new Date(Long.parseLong(s)))).append("~");
}
filterValue = stringBuilder.substring(0, stringBuilder.toString().length() - 1);
} else if (check.equals("between")) {
filterValue = ruleItems[i].getValue() + "~" + ruleItems[i].getValue1();
} else {
filterValue = ruleItems[i].getValue() == null ? "" : ruleItems[i].getValue().toString();
}
filters.add(new DynQuery.Filter(ruleItems[i].getField(), check, filterValue, i == 0 ? "and" : rule.getOperator()));
}
dynQuery.setFilters(filters);
dynQuery.setSize(rule.getNumber());
DynResult dpoImport = dynamicEntityFilterService.select("dpoImport", dynQuery);
log.info("Performing dynamic entity select: dpoImport, dynQuery: {}", dynQuery);
return dpoImport;
}
@Override
public int distributionDpoImport(List<DpoImport> dpoImportList) {
try {
DSLContext create = DSL.using(configuration);
// 分发操作...........
List<Long> idList = new ArrayList<>();
for (int i = 0; i < dpoImportList.size(); i++) {
DpoImport dpoImport = dpoImportList.get(i);
idList.add(dpoImport.getId());
}
return create.update(Tables.DPO_IMPORT)
.set(Tables.DPO_IMPORT.DISTRIBUTE_STATUS, 1)// 已分发: 1 未分发: 0
.where(Tables.DPO_IMPORT.ID.in(idList))
.execute();
} catch (Exception ex) {
log.warn("distributionDpoImport", ex);
return -1;
}
}
}
5.RuleInfoService
@RemoteService
public interface RuleInfoService {
/**
* 根据 id获取详细细腻
* @param id 编号
* @return 相信规则列表
*/
List<RuleInfo> getRuleInfoById(Long id);
/**
* 根据TM规则 id获取详细细腻
* @param id 编号
* @return 相信规则列表
*/
List<RuleInfo> getRuleInfoByRuleId(Long id);
}
@Slf4j
@Service
public class RuleInfoServiceImpl implements RuleInfoService {
@Autowired
private Configuration configuration;
@Autowired
private DynamicEntityService dynamicEntityService;
@Override
public List<RuleInfo> getRuleInfoById(Long id) {
try {
RuleInfoDao ruleInfoDao = new RuleInfoDao(configuration);
return ruleInfoDao.fetchById(id);
} catch (Exception ex) {
log.warn("getRuleInfoById", ex);
return null;
}
}
@Override
public List<RuleInfo> getRuleInfoByRuleId(Long id) {
try {
DSLContext create = DSL.using(configuration);
return create.selectFrom(Tables.RULE_INFO)
.where(Tables.RULE_INFO.RULE_ID.eq(id))
.and(Tables.RULE_INFO.STATUS.eq(1))
.fetchInto(RuleInfo.class);
} catch (Exception ex) {
log.warn("getRuleInfoByRuleId", ex);
return null;
}
}
}
6.DistributeLogService
@RemoteService
public interface DistributeLogService {
/**
* 统计TM分发了多少数据
* @param dpoImportList dpo客户列表
* @param distributeRule TM规则表
* @param ruleInfo 详细规则行
*/
void insertLog(List<DpoImport> dpoImportList, DistributeRule distributeRule, RuleInfo ruleInfo);
}
@Slf4j
@Service
public class DistributeLogServiceImpl implements DistributeLogService {
@Resource
private Configuration configuration;
@Override
public void insertLog(List<DpoImport> dpoImportList, DistributeRule distributeRule, RuleInfo ruleInfo) {
// 插入到日志
try {
// 将捐赠单id列表转为字符串
for (int i = 0; i < dpoImportList.size(); i++) {
DistributeLog distributeLog = new DistributeLog();
// 捐赠单id 列表
distributeLog.setGiftId(dpoImportList.get(i).getGiftId());
// TM 标识
distributeLog.setTmId(1L);
// 规则 rule_id
distributeLog.setRuleId(distributeRule.getId());
// 规则 item_id
distributeLog.setRuleItemId(ruleInfo.getId());
// DC预分配TM时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = df.format(new Date());
distributeLog.setDistributeDate(Timestamp.valueOf(format));
// 创建人员
distributeLog.setCreatedBy(distributeRule.getCreatedBy());
// 修改人员
distributeLog.setUpdatedBy(distributeRule.getUpdatedBy());
// 插入日志
DistributeLogDao distributeLogDao = new DistributeLogDao(configuration);
distributeLogDao.insert(distributeLog);
}
} catch (Exception ex) {
log.warn("distributionLogDpoImport", ex);
}
}
}
7.DistributionCountService
@RemoteService
public interface DistributionCountService {
/**
* 统计下每天每个TM(每个TM每天一条统计数据),根据每个规则分发了多少数据(distribute_count)
* @param distributeRule 规则列
* @param ruleInfoList 详细规则
*/
void TmDistribution(DistributeRule distributeRule, List<RuleInfo> ruleInfoList, List<Integer> distributionNumberList);
}
@Slf4j
@Service
public class DistributionCountServiceImpl implements DistributionCountService {
@Autowired
private Configuration configuration;
@Override
public void TmDistribution(DistributeRule distributeRule, List<RuleInfo> ruleInfoList, List<Integer> distributionNumberList) {
// 插入到日志
try {
// 详细规则id
StringBuilder sb = new StringBuilder();
StringBuilder detail = new StringBuilder();
// 统计总分发条数
Integer total = 0;
for (int i = 0; i < ruleInfoList.size(); i++) {
sb.append(ruleInfoList.get(i).getId());
detail.append("规则").append(i + 1).append(": ").append(distributionNumberList.get(i)).append(";");
total += distributionNumberList.get(i);
if (i < ruleInfoList.size() - 1) {
sb.append(", ");
}
}
DistributeCount distributeCount = new DistributeCount();
// TM标识
distributeCount.setTmId(1L);
// 规则id
distributeCount.setRuleId(distributeRule.getId());
// 详细规则items id
distributeCount.setRuleItems(sb.toString());
//DC预分配TM数据量
distributeCount.setDistributeTotal(total);
// 分发数据明细
distributeCount.setDistributeDetail(detail.toString());
// DC预分配TM当天时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = df.format(new Date());
distributeCount.setStamp(Timestamp.valueOf(format));
// 创建人员
distributeCount.setCreatedBy(distributeRule.getCreatedBy());
// 修改人员
distributeCount.setUpdatedBy(distributeRule.getUpdatedBy());
DistributeCountDao distributeCountDao = new DistributeCountDao(configuration);
distributeCountDao.insert(distributeCount);
} catch (Exception ex) {
log.warn("distributionStatisticsCount", ex);
}
}
}
---------------------------- 解决BUG(12月15日) -----------------------------------
完善逻辑, 优化代码
- 增加定时任务:凌晨判断规则是否到有效时间 完成
- 查询规则带排序查找 完成
- DC分配时间改为 规则有效时间 完成
- 查询未分发的 完成
- 分页最多100 超过100 需要多次循环,限制规则条数 完成
- 记录日志设置TM ID编号 完成
- 完善动态查询方法 完成
- Map代替List解决数组越界问题 完成
DpoImportDistributionTaskServiceImpl
@Slf4j
@Service
public class DpoImportDistributionTaskServiceImpl implements DpoImportDistributionTaskService {
@Resource
private DistributeRuleService distributeRuleService;
@Resource
private DpoImportService dpoImportService;
@Resource
private RuleInfoService ruleInfoService;
@Resource
private DistributeLogService distributeLogService;
@Resource
private DistributionCountService distributionCountService;
@Override
public void scheduledDistribution() throws Exception {
// 得到TM规则
List<DistributeRule> tmRules = distributeRuleService.getEnabledTMRules(1, 1);
// 遍历TM规则
for (DistributeRule distributeRule : tmRules) {
// 根据TM规则id获得 规则列表
List<RuleInfo> ruleInfoList = ruleInfoService.getRuleInfoByRuleId(distributeRule.getId(), 1);
// TM规则总分发数量
Integer total = distributeRule.getTotal();
// 记录每个规则分发的数量
List<Integer> distributionNumberList = new ArrayList<>();
// <rule.id, dpoImports.size()> 存分发数量列表
Map<Long, Integer> map = new HashMap<>();
for (RuleInfo rule : ruleInfoList) {
// 取出RuleItem规则
String ruleItemsJson = rule.getRuleItems();
// JSON转对象
ObjectMapper objectMapper = new ObjectMapper();
// 查询条件
RuleItem[] ruleItems = objectMapper.readValue(ruleItemsJson, RuleItem[].class);
Integer size = rule.getNumber();
// 查询dpo表一次最多查询100条, 规则条数不能超过TM规则设置的总条数
while (size > 0 && total - size >= 0) {
// 计算当前循环应查询的批次的大小
int batchSize = Math.min(size, 100);
DynResult importListByRuleInfo = dpoImportService.getImportListByRuleInfo(ruleItems, rule, batchSize);
List<DpoImport> dpoImports = FieldNameConverterHelper.convertToClass(importListByRuleInfo.getItems(), DpoImport.class);
// 将分发规则数量存在列表
updateDistributionMap(map, rule.getId(), dpoImports.size());
// 进行分发
int update = dpoImportService.distributionDpoImport(dpoImports, 1);
if (update >= 0) {
// 记录Log日志
distributeLogService.insertLog(dpoImports, distributeRule, rule);
}
size -= batchSize;
total -= batchSize;
}
}
// 记录count日志
distributionCountService.TmDistribution(distributeRule, map, distributionNumberList);
// 清空分发数量统计表
distributionNumberList.clear();
}
}
public void updateDistributionMap(Map<Long, Integer> map, Long id, int count) {
if (map.containsKey(id)) {
int existingValue = map.get(id);
map.put(id, existingValue + count);
} else {
map.put(id, count);
}
}
@Override
public void checkAndUpdateRuleStatus(Integer status) {
List<DistributeRule> pendingRules = distributeRuleService.getPendingRules();
ArrayList<Long> ruleIds = new ArrayList<>();
// 获取当前时间戳
long currentDate = System.currentTimeMillis();
for (DistributeRule pendingRule : pendingRules) {
String startDate = pendingRule.getStartDate();
if (currentDate > Long.parseLong(startDate)) ruleIds.add(pendingRule.getId());
}
// 传一个ids批量更新TM规则时间
distributeRuleService.updateRulesByIds(ruleIds, status); // 1: 规则生效
}
}
改动太多,暂时传一个主流程,埋坑以后再填!
评论区