From ae89050d8dd35505d1d215039eef9435aa3516c7 Mon Sep 17 00:00:00 2001 From: XinWei <2718030729@qq.com> Date: Fri, 9 Aug 2024 10:19:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E6=95=B0=E6=9C=8D=E5=8A=A1=E4=B8=AD?= =?UTF-8?q?=E5=A4=A7=E5=B1=8F(mongodb)=E4=BC=98=E5=8C=96=E4=B8=BA=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E6=9F=A5=E9=87=8D=E5=92=8C=E6=8F=92=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../checkticket/CheckTicketRepository.java | 10 + .../mongodb/saledata/SaleDataRepository.java | 12 +- .../job/ticketing/TicketDataMigration.java | 306 ++++++------------ .../checkticket/CheckTicketService.java | 14 +- .../checkticket/CheckTicketServiceImpl.java | 41 ++- .../service/saledata/SaleDataService.java | 16 +- .../service/saledata/SaleDataServiceImpl.java | 41 ++- .../src/main/resources/application-local.yaml | 1 + 8 files changed, 196 insertions(+), 245 deletions(-) diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketRepository.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketRepository.java index 5ea96703d..1008b9612 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketRepository.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/checkticket/CheckTicketRepository.java @@ -4,8 +4,18 @@ package cn.iocoder.yudao.module.infra.dal.mongodb.checkticket; import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket; import com.baomidou.dynamic.datasource.annotation.DS; import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.data.mongodb.repository.Query; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public interface CheckTicketRepository extends MongoRepository { + /** + * 根据dataId数组查找 + * @param dataIdList + * @return java.util.List + */ + @Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }") + List findByDataIdIn(List dataIdList); } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataRepository.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataRepository.java index 2d6e70bda..5da4f51ee 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataRepository.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/dal/mongodb/saledata/SaleDataRepository.java @@ -1,12 +1,22 @@ package cn.iocoder.yudao.module.infra.dal.mongodb.saledata; +import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket; import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData; import com.baomidou.dynamic.datasource.annotation.DS; import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.data.mongodb.repository.Query; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public interface SaleDataRepository extends MongoRepository { - + /** + * 更具dataId查找售票记录 + * @param dataIdList + * @return java.util.List + */ + @Query(value = "{ 'dataId': { $in: ?0 } }", fields = "{ 'dataId': 1, '_id': 0 }") + List findByDataIdIn(List dataIdList); } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java index bda454500..22f6143f4 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/job/ticketing/TicketDataMigration.java @@ -16,6 +16,7 @@ import cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingReqVO; import cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingRespVO; import cn.iocoder.yudao.module.infra.service.checkticket.CheckTicketService; import cn.iocoder.yudao.module.infra.service.saledata.SaleDataService; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; @@ -35,9 +36,17 @@ import java.util.Map; public class TicketDataMigration { private static Logger logger = LoggerFactory.getLogger(TicketDataMigration.class); // 目标地址 - private static String OBJECT_URL = "http://shinanlundu.pro.jiutianda.com/joytime-erp-eportal/console/openapi/handler"; + private static final String OBJECT_URL = "http://shinanlundu.pro.jiutianda.com/joytime-erp-eportal/console/openapi/handler"; // 每次读取条数 private static final int PAGE_SIZE = 500; + // 检票请求服务名 + private static final String CHECK_TICKET_SERVICE = "apiCheckService"; + // 售票请求服务名 + private static final String SALE_DATA_SERVICE = "apiSaleService"; + // 检票请求方法名 + private static final String CHECK_TICKET_METHOD = "CheckDetail"; + // 售票请求方法名 + private static final String SALE_DATA_METHOD = "SaleDetail"; @Resource private SaleDataService saleDataService; @Resource @@ -51,25 +60,7 @@ public class TicketDataMigration { int failedCount = 0; int pageNumber = 1; XxlJobHelper.log("抽数服务:mysql销售数据开始抽数..."); - // 准备请求参数 - TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO(); - ticketingSamplingReqVO.setService("apiSaleService"); - ticketingSamplingReqVO.setMethod("SaleDetail"); - // 抽取前一天的所有售票数据 - ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat()); - ticketingSamplingReqVO.setPageNumber(pageNumber); - ticketingSamplingReqVO.setPageSize(PAGE_SIZE); - // 先发起第一次请求 - String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - // 解析读取响应数据 - TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - // 如果访问失败则让调度中心重新调用 - if (!ticketingSamplingRespVO.isPassflag()) { - // 让调度中心重新调用 - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList = ticketingSamplingRespVO.getDataMapList(); // 总条数小于等于每页条数 则直接按照总条数进行插入即可 @@ -84,15 +75,7 @@ public class TicketDataMigration { int totalPages = ticketingSamplingRespVO.getTotalPages(); // 处理剩余页的数据 for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingReqVO.setPageNumber(pageNumber); - str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - - if (!ticketingSamplingRespVO.isPassflag()) { - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } + ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); mapList = ticketingSamplingRespVO.getDataMapList(); // 对最后一页进行处理,得到最后一页的实际条数 @@ -116,26 +99,7 @@ public class TicketDataMigration { int failedCount = 0; int pageNumber = 1; XxlJobHelper.log("抽数服务:mysql检票数据开始抽数..."); - // 准备请求参数 - TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO(); - ticketingSamplingReqVO.setService("apiCheckService"); - ticketingSamplingReqVO.setMethod("CheckDetail"); - // 抽取前一天的所有检票数据 - ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat()); - ticketingSamplingReqVO.setPageNumber(pageNumber); - ticketingSamplingReqVO.setPageSize(PAGE_SIZE); - // 先发起第一次请求 - String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - - // 解析读取响应数据 - TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - // 如果访问失败则让调度中心重新调用 - if (!ticketingSamplingRespVO.isPassflag()) { - // 让调度中心重新调用 - XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); int totalRows = ticketingSamplingRespVO.getTotalRows(); List> mapList = ticketingSamplingRespVO.getDataMapList(); // 总条数小于等于每页条数 则直接按照总条数进行插入即可 @@ -150,16 +114,7 @@ public class TicketDataMigration { int totalPages = ticketingSamplingRespVO.getTotalPages(); // 处理剩余页的数据 for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingReqVO.setPageNumber(pageNumber); - str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - - if (!ticketingSamplingRespVO.isPassflag()) { - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } - + ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, TickingDateUtils.getPreviousDayFormat(), pageNumber); mapList = ticketingSamplingRespVO.getDataMapList(); // 对最后一页进行处理,得到最后一页的实际条数 if (pageNumber != totalPages) { @@ -179,64 +134,29 @@ public class TicketDataMigration { */ @XxlJob("getSaleDataByMongoDB") public void getSaleDataByMongoDBHandler() throws Exception { - int failedCount = 0; + int successCount = 0; + int duplicatesCount = 0; int pageNumber = 1; XxlJobHelper.log("抽数服务:mongodb销售数据开始抽数..."); - // 准备请求参数 - TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO(); - ticketingSamplingReqVO.setService("apiSaleService"); - ticketingSamplingReqVO.setMethod("SaleDetail"); - // 抽取前一天的所有售票数据 - ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat()); - ticketingSamplingReqVO.setPageNumber(pageNumber); - ticketingSamplingReqVO.setPageSize(PAGE_SIZE); - // 先发起第一次请求 - String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - // 解析读取响应数据 - TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - // 如果访问失败则让调度中心重新调用 - if (!ticketingSamplingRespVO.isPassflag()) { - // 让调度中心重新调用 - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); int totalRows = ticketingSamplingRespVO.getTotalRows(); - List> mapList = ticketingSamplingRespVO.getDataMapList(); - // 总条数小于等于每页条数 则直接按照总条数进行插入即可 - if (totalRows <= PAGE_SIZE) { - failedCount += insertSaleDataByMongoDB(mapList, totalRows, pageNumber); - } else { - // 总条数大于每页条数 执行完第一次插入后根据返回的页数继续进行请求和插入 - // 插入第一页的数据 - failedCount += insertSaleDataByMongoDB(mapList, PAGE_SIZE, pageNumber); - pageNumber++; - // 获得总页数 - int totalPages = ticketingSamplingRespVO.getTotalPages(); - // 处理剩余页的数据 - for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingReqVO.setPageNumber(pageNumber); - str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - - if (!ticketingSamplingRespVO.isPassflag()) { - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } - - mapList = ticketingSamplingRespVO.getDataMapList(); - // 对最后一页进行处理,得到最后一页的实际条数 - if (pageNumber != totalPages) { - failedCount += insertSaleDataByMongoDB(mapList, PAGE_SIZE, pageNumber); - } else { - failedCount += insertSaleDataByMongoDB(mapList, totalRows % PAGE_SIZE, pageNumber); - } + List> mapList; + // 获得总页数 + int totalPages = ticketingSamplingRespVO.getTotalPages(); + // 处理剩余页的数据 + for (; pageNumber <= totalPages; pageNumber++) { + ticketingSamplingRespVO = this.getUrlResponseData(SALE_DATA_SERVICE, SALE_DATA_METHOD, TickingDateUtils.getNowDayFormat(), pageNumber); + mapList = ticketingSamplingRespVO.getDataMapList(); + List resultNum = saleDataService.insertByMongoDB(BeanUtils.toBean(mapList, SaleData.class)); + if (resultNum == null) { + XxlJobHelper.log("数据批量插入异常!", pageNumber); + } else { + successCount += resultNum.get(0); + duplicatesCount += resultNum.get(1); } - } - XxlJobHelper.log("抽数服务:销售数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); + XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", totalRows, duplicatesCount, successCount); } /** @@ -244,65 +164,30 @@ public class TicketDataMigration { */ @XxlJob("getCheckTicketByMongoDB") public void getCheckTicketByMongoDBHandler() throws Exception { - int failedCount = 0; - int pageNumber = 1; XxlJobHelper.log("抽数服务:mongodb检票数据开始抽数..."); - // 准备请求参数 - TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO(); - ticketingSamplingReqVO.setService("apiCheckService"); - ticketingSamplingReqVO.setMethod("CheckDetail"); - // 抽取前一天的所有检票数据 - ticketingSamplingReqVO.setQueryDate(TickingDateUtils.getPreviousDayFormat()); - ticketingSamplingReqVO.setPageNumber(pageNumber); - ticketingSamplingReqVO.setPageSize(PAGE_SIZE); - // 先发起第一次请求 - String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - - // 解析读取响应数据 - TicketingSamplingRespVO ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - // 如果访问失败则让调度中心重新调用 - if (!ticketingSamplingRespVO.isPassflag()) { - // 让调度中心重新调用 - XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } + int successCount = 0; + int duplicatesCount = 0; + int pageNumber = 1; + String date = TickingDateUtils.getNowDayFormat(); + TicketingSamplingRespVO ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber); int totalRows = ticketingSamplingRespVO.getTotalRows(); - List> mapList = ticketingSamplingRespVO.getDataMapList(); - // 总条数小于等于每页条数 则直接按照总条数进行插入即可 - if (totalRows <= PAGE_SIZE) { - failedCount += insertCheckTicketByMongoDB(mapList, totalRows, pageNumber); - } else { - // 总条数大于每页条数 执行完第一次插入后根据返回的页数继续进行请求和插入 - // 插入第一页的数据 - failedCount += insertCheckTicketByMongoDB(mapList, PAGE_SIZE, pageNumber); - pageNumber++; - // 获得总页数 - int totalPages = ticketingSamplingRespVO.getTotalPages(); - // 处理剩余页的数据 - for (; pageNumber <= totalPages; pageNumber++) { - ticketingSamplingReqVO.setPageNumber(pageNumber); - str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); - ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); - - if (!ticketingSamplingRespVO.isPassflag()) { - XxlJobHelper.log("抽数服务:销售数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); - XxlJobHelper.handleFail("销售数据抽数失败,请重试"); - return; - } - - mapList = ticketingSamplingRespVO.getDataMapList(); - // 对最后一页进行处理,得到最后一页的实际条数 - if (pageNumber != totalPages) { - failedCount += insertCheckTicketByMongoDB(mapList, PAGE_SIZE, pageNumber); - } else { - failedCount += insertCheckTicketByMongoDB(mapList, totalRows % PAGE_SIZE, pageNumber); - } + List> mapList; + // 获得总页数 + int totalPages = ticketingSamplingRespVO.getTotalPages(); + // 处理剩余页的数据 + for (; pageNumber <= totalPages; pageNumber++) { + ticketingSamplingRespVO = this.getUrlResponseData(CHECK_TICKET_SERVICE, CHECK_TICKET_METHOD, date, pageNumber); + mapList = ticketingSamplingRespVO.getDataMapList(); + List resultNum = checkTicketService.insertByMongoDB(BeanUtils.toBean(mapList, CheckTicket.class)); + if (resultNum == null) { + XxlJobHelper.log("数据批量插入异常!", pageNumber); + } else { + successCount += resultNum.get(0); + duplicatesCount += resultNum.get(1); } - } - XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据插入失败!", totalRows, failedCount); + XxlJobHelper.log("抽数服务:检票数据抽数正常结束!总共:{}条数据,{}条数据重复,{}条数据插入成功!", totalRows, duplicatesCount, successCount); } /** @@ -348,66 +233,55 @@ public class TicketDataMigration { } /** - * 插入销售数据的方法(mongodb) + * 请求目标url的响应数据 * - * @param mapList 数据集合数组 - * @param listSize 实际个数 - * @param pageNumber 每页显示条数 - * @return int 失败条数 + * @param service + * @param method + * @param date + * @param pageNumber + * @return cn.iocoder.yudao.module.infra.job.ticketing.vo.TicketingSamplingRespVO */ - private int insertSaleDataByMongoDB(List> mapList, int listSize, int pageNumber) { - int failedCount = 0; - int repeatingCount = 0; - for (int i = 0; i < listSize; i++) { - Map currentMap = mapList.get(i); - SaleData saleData = BeanUtils.toBean(currentMap, SaleData.class); - switch (saleDataService.createSaleDataByMongoDB(saleData)) { - case "添加成功": - break; - case "重复数据": - repeatingCount++; - XxlJobHelper.log("第{}条数据重复,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap); - break; - case "Mongo数据库写入异常": - failedCount++; - XxlJobHelper.log("第{}条数据插入失败,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap); - break; - } + private TicketingSamplingRespVO getUrlResponseData(String service, String method, String date, int pageNumber) { + // 准备请求参数 + TicketingSamplingReqVO ticketingSamplingReqVO = new TicketingSamplingReqVO(); + ticketingSamplingReqVO.setService(service); + ticketingSamplingReqVO.setMethod(method); + ticketingSamplingReqVO.setQueryDate(date); + ticketingSamplingReqVO.setPageNumber(pageNumber); + ticketingSamplingReqVO.setPageSize(PAGE_SIZE); + // 先发起第一次请求 + String str = HttpUtil.post(OBJECT_URL, JSONUtil.toJsonStr(ticketingSamplingReqVO)); + // 解析读取响应数据 + TicketingSamplingRespVO ticketingSamplingRespVO; + try { + ticketingSamplingRespVO = new ObjectMapper().readValue(str, TicketingSamplingRespVO.class); + } catch (JsonProcessingException e) { + XxlJobHelper.log("请求数据解析异常"); + throw new RuntimeException(); } - XxlJobHelper.log("共有{}条数据重复,已跳过存储。", repeatingCount); - return failedCount; + // TODO 需要再优化 + if (ticketingSamplingRespVO != null && !ticketingSamplingRespVO.isPassflag()) { + // 让调度中心重新调用 + XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); + XxlJobHelper.handleFail("销售数据抽数失败,请重试"); + throw new RuntimeException(); + } + return ticketingSamplingRespVO; } /** - * 插入检票数据的方法(mongodb) + * 请求失败消息 * - * @param mapList 数据集合数组 - * @param listSize 实际个数 - * @param pageNumber 每页显示条数 - * @return int 失败条数 + * @param + * @return void */ - private int insertCheckTicketByMongoDB(List> mapList, int listSize, int pageNumber) { - int failedCount = 0; - int repeatingCount = 0; - for (int i = 0; i < listSize; i++) { - Map currentMap = mapList.get(i); - // 把获取到的数据转为插入检票的参数类型 - CheckTicket checkTicket = BeanUtils.toBean(currentMap, CheckTicket.class); - switch (checkTicketService.createCheckTicketByMongoDB(checkTicket)) { - case "添加成功": - break; - case "重复数据": - repeatingCount++; - XxlJobHelper.log("第{}条数据重复,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap); - break; - case "Mongo数据库写入异常": - failedCount++; - XxlJobHelper.log("第{}条数据插入失败,数据值为:{}", ((pageNumber - 1) * PAGE_SIZE + i + 1), currentMap); - break; - } + private void requestFailedMessage(TicketingSamplingRespVO ticketingSamplingRespVO) { + // 如果访问失败则让调度中心重新调用 + if (!ticketingSamplingRespVO.isPassflag()) { + // 让调度中心重新调用 + XxlJobHelper.log("抽数服务:检票数据抽数失败,返回消息:" + ticketingSamplingRespVO.getRtnMsg()); + XxlJobHelper.handleFail("销售数据抽数失败,请重试"); } - XxlJobHelper.log("共有{}条数据重复,已跳过存储。", repeatingCount); - return failedCount; } } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java index 2f493ba2d..6ccc56fb7 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketService.java @@ -24,5 +24,17 @@ public interface CheckTicketService { */ Long createCheckTicket(@Valid CheckTicketDO createReqVO); - String createCheckTicketByMongoDB(CheckTicket checkTicketSaveReqDTO); + /** + * 查询重复的dataId(mongodb) + * @param dataIdList dataId数组 + * @return java.util.List + */ + List checkDuplicatesByMongoDB(List dataIdList); + + /** + * 批量插入检票数据(mongodb) + * @param checkTicketList + * @return java.util.List 插入数据和重复数据 + */ + List insertByMongoDB(List checkTicketList); } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java index 3cd345b17..6a1ad4081 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/checkticket/CheckTicketServiceImpl.java @@ -12,6 +12,8 @@ import com.baomidou.dynamic.datasource.annotation.Slave; import com.mongodb.MongoException; import org.springframework.data.domain.Example; import org.springframework.data.domain.ExampleMatcher; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; @@ -19,6 +21,7 @@ import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.stream.Collectors; import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.exact; @@ -45,23 +48,31 @@ public class CheckTicketServiceImpl implements CheckTicketService { } @Override - public String createCheckTicketByMongoDB(CheckTicket checkTicket) { + public List checkDuplicatesByMongoDB(List dataIdList) { + // 查询数据库中存在的 dataId + List existingTickets = checkTicketRepository.findByDataIdIn(dataIdList); + // 把CheckTicket对象列表改为dataId字符串列表 + return existingTickets.stream().map(CheckTicket::getDataId).collect(Collectors.toList()); + } + + @Override + public List insertByMongoDB(List checkTicketList) { + List resultNum = new ArrayList<>(2); + // 把检票数据的dataId提取出来成数组 + List dataIdList = checkTicketList.stream().map(CheckTicket::getDataId).collect(Collectors.toList()); + // 获取重复的dataId + List duplicatesDataId = this.checkDuplicatesByMongoDB(dataIdList); + // 对checkTicketList过滤掉重复的dataId对应的数据,得到需要插入的不重复数据 + List insertCheckTicketList = checkTicketList.stream().filter(ticket -> !duplicatesDataId.contains(ticket.getDataId())).collect(Collectors.toList()); try { - checkTicketRepository.findOne(Example.of(checkTicket, ExampleMatcher.matching(). - withIgnorePaths("_id").withMatcher("dataId", exact()))); - return "重复数据"; - } catch (NoSuchElementException e) { - try { - // 执行插入操作 - checkTicketRepository.insert(checkTicket); - return "添加成功"; - } catch (MongoException e2) { - return "Mongo数据库写入异常"; - } + // 批量插入 + checkTicketRepository.insert(insertCheckTicketList); + } catch (MongoException e){ + return null; } - - - + resultNum.add(insertCheckTicketList.size()); + resultNum.add(duplicatesDataId.size()); + return resultNum; } } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java index 16baeb430..04136ae06 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataService.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.infra.service.saledata; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataPageReqVO; import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataSaveReqVO; +import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket; import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData; import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleDataDO; @@ -27,9 +28,16 @@ public interface SaleDataService { Long createSaleData(@Valid SaleDataDO saleDataDO); /** - * 插入售票信息到mongodb中 - * @param saleData - * @return java.lang.String + * 查询重复的dataId(mongodb) + * @param dataIdList dataId数组 + * @return java.util.List */ - String createSaleDataByMongoDB(SaleData saleData); + List checkDuplicatesByMongoDB(List dataIdList); + + /** + * 批量插入检票数据(mongodb) + * @param saleDataList + * @return java.util.List + */ + List insertByMongoDB(List saleDataList); } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java index 9eb59cde4..753439d02 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/saledata/SaleDataServiceImpl.java @@ -5,13 +5,17 @@ import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.framework.common.util.ticket.IdCardUtil; import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; import cn.iocoder.yudao.module.infra.controller.saledata.vo.SaleDataSaveReqVO; +import cn.iocoder.yudao.module.infra.dal.dataobject.checkticket.CheckTicket; import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleData; import cn.iocoder.yudao.module.infra.dal.dataobject.saledata.SaleDataDO; import cn.iocoder.yudao.module.infra.dal.mongodb.saledata.SaleDataRepository; import cn.iocoder.yudao.module.infra.dal.mysql.saledata.SaleDataMapper; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.Slave; +import com.mongodb.DuplicateKeyException; import com.mongodb.MongoException; +import org.springframework.data.domain.Example; +import org.springframework.data.domain.ExampleMatcher; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; @@ -22,6 +26,9 @@ import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.stream.Collectors; + +import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.exact; /** @@ -45,15 +52,33 @@ public class SaleDataServiceImpl implements SaleDataService { return saleDataDO.getId(); } - @Override - public String createSaleDataByMongoDB(SaleData saleData) { - try { - saleDataRepository.insert(saleData); - return "添加成功"; - } catch (MongoException e) { - return "Mongo数据库写入异常"; - } + @Override + public List checkDuplicatesByMongoDB(List dataIdList) { + // 查询数据库中存在的 dataId + List existingTickets = saleDataRepository.findByDataIdIn(dataIdList); + // 把CheckTicket对象列表改为dataId字符串列表 + return existingTickets.stream().map(CheckTicket::getDataId).collect(Collectors.toList()); + } + + @Override + public List insertByMongoDB(List saleDataList) { + List resultNum = new ArrayList<>(2); + // 把检票数据的dataId提取出来成数组 + List dataIdList = saleDataList.stream().map(SaleData::getDataId).collect(Collectors.toList()); + // 获取重复的dataId + List duplicatesDataId = this.checkDuplicatesByMongoDB(dataIdList); + // 对checkTicketList过滤掉重复的dataId对应的数据,得到需要插入的不重复数据 + List insertSaleDataList = saleDataList.stream().filter(saleData -> !duplicatesDataId.contains(saleData.getDataId())).collect(Collectors.toList()); + try { + // 批量插入 + saleDataRepository.insert(insertSaleDataList); + } catch (MongoException e){ + return null; + } + resultNum.add(insertSaleDataList.size()); + resultNum.add(duplicatesDataId.size()); + return resultNum; } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application-local.yaml b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application-local.yaml index 4ff2872ae..e5f736cc9 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application-local.yaml +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/resources/application-local.yaml @@ -4,6 +4,7 @@ spring: mongodb: uri: mongodb://root:123456@120.46.37.243:27017/admin?authMechanism=SCRAM-SHA-256 database: sn-lundu-db +# uri: mongodb://127.0.0.1:27017/sn-lundu-db? # 数据源配置项 autoconfigure: exclude: