目 录CONTENT

文章目录

分发业务理解及实现(12月15日更新)

不争
2024-01-02 / 0 评论 / 0 点赞 / 69 阅读 / 39318 字

数据库理解

  1. dpo_import : 保存的是所有待分发数据
  2. distribute_rule:分发规则,每个TM(team)一个规则
  3. rule_info:对应distribute_rule的细分规则
  4. distribute_log:数据分发日志,记录每天数据分发情况
  5. distribute_count:分发统计

数据库ER图

distribute_rule:分发规则

test1规则

test2规则

需求理解

业务需求

1.每天零点对dpo_import数据,按照分发规则(distribute_rule)进行分发
2.记录分发日志(distribute_log)
3.统计下每天每个TM(每个TM每天一条统计数据),根据每个规则分发了多少数据(distribute_count)

简易分析流程

零:每天根据start_date生效时间 更新status状态 没到时间就是待生效,到时间就是生效中 过期就是某时间

一、定时任务:每天根据生效时间更新生效状态零点调用 查询(enable已启用 且 在生效时间status=1)的TM规则列表 调用分发函数(传一个规则) 修改当前status状态(1:生效中)

二、分发函数:按照分发规则分发,分发结束后,记录日志,统计今天的每个规则分发了的数据

三、按规则分发:

  • 1.拿到该规则需要分发的人数
  • 2.按照分发规则筛选符合条件且未分发(distribute_status = 0)的人数,取出前需要分发的人数(规则人数)
  • 3.插入日志(distribute_log)(gift_id、tm_id、rule_id、rule_item_id、created_by、updated_by)

四:调用统计规则函数:当一个TM分发规则完成后,在给统计表插入(tm_id、rule_id、rule_items(rule_info.id)、rule_info.id、distribute_total(分发总数量)、distribute_detail(分发数据明细)、stamp(DC预分配TM当天时间)、创建人员、更新人员)

简易分析流程图

distrbute_count表

rule_items表示详细规则编号, detail表示具体分发了多少数据

distribute_log表

distribute_log表

代码流程

定时任务-> 找到已启动且激活的规则列表 -> 根据规则id在 详细规则表中找到 动态的条件 -> 拿到条件筛选出要分发的名单 -> 记录人员分发日志 -> 统计TM规则分发数量日志 -> 打印分发运行时间

1.写一个定时任务

dpoImportDistributionTaskService里面包含了所有业务操作

@Slf4j
@Component
public class DistributeTasks {
    @Resource
    private DpoImportDistributionTaskService dpoImportDistributionTaskService;
    // 定义定时任务方法,每天零点执行
    @Scheduled(cron = "0 0 0 * * ?")
    public void distributionTask() {

        try {
            long tm1 = System.currentTimeMillis();
            dpoImportDistributionTaskService.scheduledDistribution();
            long tm2 = System.currentTimeMillis();
            log.debug(String.format("distributionTask 耗时-> 执行: %d", tm2 - tm1));
        } catch (Exception ex) {
            log.warn("distributionTask", ex);
        }
    }
}

2.两次遍历循环找到 规则列表 和 详细规则动态查询条件 并记录日志

类型转化的工具类参考动态规则查询返回Object类转化成DpoImport类, 逐步优化转为工具类

@Slf4j
@Service
public class DpoImportDistributionTaskServiceImpl implements DpoImportDistributionTaskService {
    @Resource
    private DistributeRuleService distributeRuleService;

    @Resource
    private DpoImportService dpoImportService;

    @Resource
    private RuleInfoService ruleInfoService;
    @Resource
    private DistributeLogService distributeLogService;

    @Resource
    private DistributionCountService distributionCountService;

    @Override
    public void scheduledDistribution() throws Exception {
        // 得到TM规则
        List<DistributeRule> tmRules = distributeRuleService.getEnabledTMRules();
        // 遍历TM规则
        for (DistributeRule distributeRule : tmRules) {
            // 根据TM规则id获得 规则列表
            List<RuleInfo> ruleInfoList = ruleInfoService.getRuleInfoByRuleId(distributeRule.getTmId());
            // TM规则总分发数量
            Integer total = distributeRule.getTotal();
            // 记录每个规则分发的数量
            List<Integer> distributionNumberList = new ArrayList<>();

            for (RuleInfo rule : ruleInfoList) {
                // 取出RuleItem规则
                String ruleItemsJson = rule.getRuleItems();
                // JSON转对象
                ObjectMapper objectMapper = new ObjectMapper();
                // 查询条件
                RuleItem[] ruleItems = objectMapper.readValue(ruleItemsJson, RuleItem[].class);
                // 得到该规则之一 id拿到的 分发人员列表
                DynResult importListByRuleInfo = dpoImportService.getImportListByRuleInfo(ruleItems, rule);

                List<DpoImport> dpoImports = FieldNameConverterHelper.convertToClass(importListByRuleInfo.getItems(), DpoImport.class);
                // 只要分发数量没有超过TM总数量
                if ((total -= dpoImports.size()) < 0) continue;
                // 将分发规则存在列表
                distributionNumberList.add(dpoImports.size());
                // 进行分发
                int update = dpoImportService.distributionDpoImport(dpoImports);

                if (update >= 0) {
                    // 记录Log日志
                    distributeLogService.insertLog(dpoImports, distributeRule, rule);
                }
            }
            // 记录count日志
            distributionCountService.TmDistribution(distributeRule, ruleInfoList, distributionNumberList);
            // 清空分发数量统计表
            distributionNumberList.clear();
        }
    }
}

3.DistributeRuleService

@RemoteService
public interface DistributeRuleService {
    /**
     * 获取已启用且规则生效中的TM规则列表
     * @return TM列表
     */
    List<DistributeRule> getEnabledTMRules();
}

@Slf4j
@Service
public class DistributeRuleServiceImpl implements DistributeRuleService {

    @Autowired
    private Configuration configuration;
    @Override
    public List<DistributeRule> getEnabledTMRules() {
        try {
            DSLContext create = DSL.using(configuration);

            return create.selectFrom(Tables.DISTRIBUTE_RULE)
                    .where(Tables.DISTRIBUTE_RULE.ENABLE.eq(1))
                    .and(Tables.DISTRIBUTE_RULE.STATUS.eq(1))
                    .fetchInto(DistributeRule.class);
        } catch (Exception ex) {
            log.warn("getEnabledTMRules", ex);
            return null;
        }

    }
}

4.DpoImportService

@RemoteService
public interface DpoImportService {
    /**
     *  根据传来的筛选规则 查询dpo表
     * @param ruleItems 详细规则
     * @param rule 规则列
     * @return dpo表
     */
    DynResult getImportListByRuleInfo(RuleItem[] ruleItems, RuleInfo rule);

    /**
     * 分发操作并
     * @param dpoImportList dpo客户表
     */
    int distributionDpoImport(List<DpoImport> dpoImportList);
}

@Slf4j
@Service
public class DpoImportServiceImpl implements DpoImportService {
    @Autowired
    private Configuration configuration;
    @Autowired
    private DynamicEntityFilterService dynamicEntityFilterService;


    @Override
    public DynResult getImportListByRuleInfo(RuleItem[] ruleItems, RuleInfo rule) {


        Gson gson = new Gson();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        List<DynQuery.Filter> filters = new ArrayList<>();
        DynQuery dynQuery = new DynQuery();

        for (int i = 0; i < ruleItems.length; i ++) {
            String check = ruleItems[i].getCheck();
            boolean checkable = ruleItems[i].isCheckable();
            String filterValue;
            if (checkable && check.equals("=")) {
                check = "in";
                List<String> value = Arrays.asList(gson.fromJson(ruleItems[i].getValue().toString(), String[].class));
                filterValue = StringUtils.join(value, ",");
            } else if (check.equals("excludeBetween")) {
                StringBuilder stringBuilder = new StringBuilder();
                for (String s : ruleItems[i].getValue().toString().split("~")) {
                    stringBuilder.append(sdf.format(new Date(Long.parseLong(s)))).append("~");
                }
                filterValue = stringBuilder.substring(0, stringBuilder.toString().length() - 1);
            } else if (check.equals("between")) {
                filterValue = ruleItems[i].getValue() + "~" + ruleItems[i].getValue1();
            } else {
                filterValue = ruleItems[i].getValue() == null ? "" : ruleItems[i].getValue().toString();
            }

            filters.add(new DynQuery.Filter(ruleItems[i].getField(), check, filterValue, i == 0 ? "and" : rule.getOperator()));

        }
        dynQuery.setFilters(filters);
        dynQuery.setSize(rule.getNumber());
        DynResult dpoImport = dynamicEntityFilterService.select("dpoImport", dynQuery);
        log.info("Performing dynamic entity select: dpoImport, dynQuery: {}", dynQuery);

        return  dpoImport;
    }


    @Override
    public int distributionDpoImport(List<DpoImport> dpoImportList) {

        try {
            DSLContext create = DSL.using(configuration);
            // 分发操作...........
            List<Long> idList = new ArrayList<>();
            for (int i = 0; i < dpoImportList.size(); i++) {
                DpoImport dpoImport = dpoImportList.get(i);
                idList.add(dpoImport.getId());
            }

            return  create.update(Tables.DPO_IMPORT)
                    .set(Tables.DPO_IMPORT.DISTRIBUTE_STATUS, 1)// 已分发: 1 未分发: 0
                    .where(Tables.DPO_IMPORT.ID.in(idList))
                    .execute();


        } catch (Exception ex) {
            log.warn("distributionDpoImport", ex);
            return -1;
        }
    }
}

5.RuleInfoService

@RemoteService
public interface RuleInfoService {
    /**
     * 根据 id获取详细细腻
     * @param id 编号
     * @return 相信规则列表
     */
    List<RuleInfo> getRuleInfoById(Long id);

    /**
     * 根据TM规则 id获取详细细腻
     * @param id 编号
     * @return 相信规则列表
     */
    List<RuleInfo> getRuleInfoByRuleId(Long id);
}

@Slf4j
@Service
public class RuleInfoServiceImpl implements RuleInfoService {

    @Autowired
    private Configuration configuration;
    @Autowired
    private DynamicEntityService dynamicEntityService;
    @Override
    public List<RuleInfo> getRuleInfoById(Long id) {
        try {
            RuleInfoDao ruleInfoDao = new RuleInfoDao(configuration);
            return ruleInfoDao.fetchById(id);
        } catch (Exception ex) {
            log.warn("getRuleInfoById", ex);
            return null;
        }
    }

    @Override
    public List<RuleInfo> getRuleInfoByRuleId(Long id) {
        try {
            DSLContext create = DSL.using(configuration);

            return create.selectFrom(Tables.RULE_INFO)
                    .where(Tables.RULE_INFO.RULE_ID.eq(id))
                    .and(Tables.RULE_INFO.STATUS.eq(1))
                    .fetchInto(RuleInfo.class);
        } catch (Exception ex) {
            log.warn("getRuleInfoByRuleId", ex);
            return null;
        }
    }
}

6.DistributeLogService

@RemoteService
public interface DistributeLogService {
    /**
     * 统计TM分发了多少数据
     * @param dpoImportList dpo客户列表
     * @param distributeRule TM规则表
     * @param ruleInfo 详细规则行
     */
    void insertLog(List<DpoImport> dpoImportList, DistributeRule distributeRule, RuleInfo ruleInfo);
}

@Slf4j
@Service
public class DistributeLogServiceImpl implements DistributeLogService {
    @Resource
    private Configuration configuration;
    @Override
    public void insertLog(List<DpoImport> dpoImportList, DistributeRule distributeRule, RuleInfo ruleInfo) {
        // 插入到日志
        try {
            // 将捐赠单id列表转为字符串
            for (int i = 0; i < dpoImportList.size(); i++) {
                DistributeLog distributeLog = new DistributeLog();
                // 捐赠单id 列表
                distributeLog.setGiftId(dpoImportList.get(i).getGiftId());
                // TM 标识
                distributeLog.setTmId(1L);
                // 规则 rule_id
                distributeLog.setRuleId(distributeRule.getId());
                // 规则 item_id
                distributeLog.setRuleItemId(ruleInfo.getId());
                // DC预分配TM时间
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String format = df.format(new Date());
                distributeLog.setDistributeDate(Timestamp.valueOf(format));
                // 创建人员
                distributeLog.setCreatedBy(distributeRule.getCreatedBy());
                // 修改人员
                distributeLog.setUpdatedBy(distributeRule.getUpdatedBy());

                // 插入日志
                DistributeLogDao distributeLogDao = new DistributeLogDao(configuration);
                distributeLogDao.insert(distributeLog);
            }


        } catch (Exception ex) {
            log.warn("distributionLogDpoImport", ex);
        }
    }
}

7.DistributionCountService

@RemoteService
public interface DistributionCountService {
    /**
     * 统计下每天每个TM(每个TM每天一条统计数据),根据每个规则分发了多少数据(distribute_count)
     * @param distributeRule 规则列
     * @param ruleInfoList 详细规则
     */
    void TmDistribution(DistributeRule distributeRule, List<RuleInfo> ruleInfoList, List<Integer> distributionNumberList);
}

@Slf4j
@Service
public class DistributionCountServiceImpl implements DistributionCountService {

    @Autowired
    private Configuration configuration;

    @Override
    public void TmDistribution(DistributeRule distributeRule, List<RuleInfo> ruleInfoList, List<Integer> distributionNumberList) {

        // 插入到日志
        try {
            // 详细规则id
            StringBuilder sb = new StringBuilder();
            StringBuilder detail = new StringBuilder();
            // 统计总分发条数
            Integer total = 0;

            for (int i = 0; i < ruleInfoList.size(); i++) {
                sb.append(ruleInfoList.get(i).getId());
                detail.append("规则").append(i + 1).append(": ").append(distributionNumberList.get(i)).append(";");
                total += distributionNumberList.get(i);
                if (i < ruleInfoList.size() - 1) {
                    sb.append(", ");
                }
            }

            DistributeCount distributeCount = new DistributeCount();
            // TM标识
            distributeCount.setTmId(1L);
            // 规则id
            distributeCount.setRuleId(distributeRule.getId());
            // 详细规则items id
            distributeCount.setRuleItems(sb.toString());
            //DC预分配TM数据量
            distributeCount.setDistributeTotal(total);
            // 分发数据明细
            distributeCount.setDistributeDetail(detail.toString());
            // DC预分配TM当天时间
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String format = df.format(new Date());
            distributeCount.setStamp(Timestamp.valueOf(format));
            // 创建人员
            distributeCount.setCreatedBy(distributeRule.getCreatedBy());
            // 修改人员
            distributeCount.setUpdatedBy(distributeRule.getUpdatedBy());

            DistributeCountDao distributeCountDao = new DistributeCountDao(configuration);
            distributeCountDao.insert(distributeCount);
        } catch (Exception ex) {
            log.warn("distributionStatisticsCount", ex);
        }

    }

}


---------------------------- 解决BUG(12月15日) -----------------------------------

完善逻辑, 优化代码

  1. 增加定时任务:凌晨判断规则是否到有效时间 完成
  2. 查询规则带排序查找 完成
  3. DC分配时间改为 规则有效时间 完成
  4. 查询未分发的 完成
  5. 分页最多100 超过100 需要多次循环,限制规则条数 完成
  6. 记录日志设置TM ID编号 完成
  7. 完善动态查询方法 完成
  8. Map代替List解决数组越界问题 完成

DpoImportDistributionTaskServiceImpl

@Slf4j
@Service
public class DpoImportDistributionTaskServiceImpl implements DpoImportDistributionTaskService {
    @Resource
    private DistributeRuleService distributeRuleService;
    @Resource
    private DpoImportService dpoImportService;
    @Resource
    private RuleInfoService ruleInfoService;
    @Resource
    private DistributeLogService distributeLogService;
    @Resource
    private DistributionCountService distributionCountService;

    @Override
    public void scheduledDistribution() throws Exception {
        // 得到TM规则
        List<DistributeRule> tmRules = distributeRuleService.getEnabledTMRules(1, 1);
        // 遍历TM规则
        for (DistributeRule distributeRule : tmRules) {
            // 根据TM规则id获得 规则列表
            List<RuleInfo> ruleInfoList = ruleInfoService.getRuleInfoByRuleId(distributeRule.getId(), 1);
            // TM规则总分发数量
            Integer total = distributeRule.getTotal();
            // 记录每个规则分发的数量
            List<Integer> distributionNumberList = new ArrayList<>();
            // <rule.id, dpoImports.size()> 存分发数量列表
            Map<Long, Integer> map = new HashMap<>();
            for (RuleInfo rule : ruleInfoList) {
                // 取出RuleItem规则
                String ruleItemsJson = rule.getRuleItems();
                // JSON转对象
                ObjectMapper objectMapper = new ObjectMapper();
                // 查询条件
                RuleItem[] ruleItems = objectMapper.readValue(ruleItemsJson, RuleItem[].class);

                Integer size = rule.getNumber();
                // 查询dpo表一次最多查询100条, 规则条数不能超过TM规则设置的总条数
                while (size > 0 && total - size >= 0) {
                    // 计算当前循环应查询的批次的大小
                    int batchSize = Math.min(size, 100);

                    DynResult importListByRuleInfo = dpoImportService.getImportListByRuleInfo(ruleItems, rule, batchSize);

                    List<DpoImport> dpoImports = FieldNameConverterHelper.convertToClass(importListByRuleInfo.getItems(), DpoImport.class);

                   // 将分发规则数量存在列表
                    updateDistributionMap(map, rule.getId(), dpoImports.size());

                    // 进行分发
                    int update = dpoImportService.distributionDpoImport(dpoImports, 1);

                    if (update >= 0) {
                        // 记录Log日志
                        distributeLogService.insertLog(dpoImports, distributeRule, rule);
                    }
                    size -= batchSize;
                    total -= batchSize;
                }
            }
            // 记录count日志
            distributionCountService.TmDistribution(distributeRule, map, distributionNumberList);
            // 清空分发数量统计表
            distributionNumberList.clear();
        }
    }
    public void updateDistributionMap(Map<Long, Integer> map, Long id, int count) {
        if (map.containsKey(id)) {
            int existingValue = map.get(id);
            map.put(id, existingValue + count);
        } else {
            map.put(id, count);
        }
    }

    @Override
    public void checkAndUpdateRuleStatus(Integer status) {
        List<DistributeRule> pendingRules = distributeRuleService.getPendingRules();
        ArrayList<Long> ruleIds = new ArrayList<>();
        // 获取当前时间戳
        long currentDate = System.currentTimeMillis();
        for (DistributeRule pendingRule : pendingRules) {
            String startDate = pendingRule.getStartDate();
            if (currentDate > Long.parseLong(startDate)) ruleIds.add(pendingRule.getId());
        }
        // 传一个ids批量更新TM规则时间
        distributeRuleService.updateRulesByIds(ruleIds, status); // 1: 规则生效
    }
}

改动太多,暂时传一个主流程,埋坑以后再填!

0

评论区