博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
数据迁移方案 + Elasticsearch在综合搜索列表实现
阅读量:6366 次
发布时间:2019-06-23

本文共 36330 字,大约阅读时间需要 121 分钟。

1 UML

2 迁移实现

2.1 从数据库读取进件id存到文件中

@RequestMapping(value = "***", method = {RequestMethod.GET})    @ResponseBody    public void writeFile(Integer id) {        if (null == id) {            pieceSearchService.writePieceFile();        } else {            pieceSearchService.writePieceFile(id);        }    }        // PieceSearchServiceImpl    public void writePieceFile() {        dataMoveManager.startWriteFile();    }复制代码
// AbstractTemplate    public void startWriteFile() {        readDataAndWriteFile();    }    // DataMoveManager    protected void readDataAndWriteFile() {        pieceInfoWriteFileTask.start();    }        // AbstractTaskTemplate    public void start() {        List
> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoWriteFileTask protected List
> createTask() { //获取进件起始id 并构建任务 //查询所有进件数据(500W+) List
pieceIdAndIdDTOS = pieceSearchDao.queryAllPieceInfo(); List
tasks = this.getTaskFromPieceId(pieceIdAndIdDTOS); return tasks; } // PieceInfoWriteFileTask List
getTaskFromPieceId(List
pieceIdAndIdDTOS) { //任务集合 List
result = Lists.newArrayList(); Integer index = 1; int fileName = 1; List
ids = Lists.newArrayList(); for (PieceIdAndIdDTO pieceIdAndIdDTO : pieceIdAndIdDTOS) { ids.add(pieceIdAndIdDTO); //每 条创建一个任务 if (index % FILE_MAX_NUM == 0) { //创建任务 MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); ids = Lists.newArrayList(); //一定要new fileName++; } index++; } if (!ids.isEmpty()) { MyTask myTask = new MyTask(); myTask.setIds(ids); myTask.setFileName(Integer.toString(fileName)); result.add(myTask); } return result; } // PieceInfoWriteFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List
> tasks) { //线程池执行任务 List
> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List
results = Lists.newArrayList(); for (Future
future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoWriteFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "writeFile.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoWriteFileTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } }复制代码
// PieceInfoWriteFileTask    class MyTask implements Callable
{ private List
ids; private String fileName; @Override public String call() { String message = "文件生成失败"; try { message = writeFile(ids, fileName); long l2 = System.currentTimeMillis(); } catch (Exception e) { return message + "----失败原因:" + e.getMessage(); } return message; } } // PieceInfoWriteFileTask private String writeFile(List
pieceIdAndId, String fileName) { //文件路径+文件名 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName + ".txt"; try { log.info("文件输出路径:" + filePath); //输出文件 FileKit.writeLines(pieceIdAndId, filePath, "UTF-8"); } catch (IOException e) { return "此文件生成失败:" + fileName + "失败原因:" + e.getMessage(); } return "此文件生成成功:" + fileName; }复制代码

2.2 数据迁移

2.2.1 从文件读取id存到队列中

@RequestMapping(value = "***", method = {RequestMethod.GET})    @ResponseBody    public void dataMove() {        pieceSearchService.pieceDataMove();    }        // PieceSearchServiceImpl    public void pieceDataMove() {        dataMoveManager.startReadForDb();    }        复制代码
// AbstractTemplate    public void startReadForDb() {        readFileForDb();    }    // DataMoveManager    protected void readFileForDb() {        pieceInfoDataMoveTask.start();    }        // AbstractTaskTemplate    public void start() {        List
> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoDataMoveTask protected List
> createTask() { //获取此路径下的所有.txt文件 List
files = null; try { files = FileKit.listFilesWithSuffix(new File(ABSOLUTE_PATH + FILE_ROOT_PATH), ".txt"); } catch (Exception e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: createTask()" + " 创建任务组失败, 失败原因:" + e.getMessage()); } //根据文件构建任务 List
tasks = getTaskWithFile(files); return tasks; } // PieceInfoDataMoveTask private List
getTaskWithFile(List
files) { ArrayList
tasks = Lists.newArrayList(); for (File file : files) { MyTask myTask = new MyTask(); myTask.setFile(file); tasks.add(myTask); } return tasks; } // PieceInfoDataMoveTask private class MyTask implements Callable
{ private File file; @Override public String call() throws Exception { //每次保存的进件Id ArrayList
ids = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); //文件名称 String fileName = file.getName(); BufferedReader reader = null; //行数据 String line; int a = 1; try { reader = FileKit.getReader(file, "UTF-8"); while (true) { line = reader.readLine(); if (Func.isEmpty(line)) { break; } //每行数据格式为 id:1121 pieceId:ZPTE1213 String id = line.split("\\s+")[0].split(":")[1]; ids.add(id); //每多少行 执行插入操作 if (a % MAX_SAVE_NUM == 0) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); ids = Lists.newArrayList(); } a++; } } catch (Exception e) { return "读取" + fileName + "文件失败原因:" + e.getMessage(); } finally { if (!ids.isEmpty()) { //存入队列 QueueUtil.QUEUE_IDS.put(ids); } IoKit.close(reader); } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "读取" + fileName + "文件完毕,耗时:" + useTime + "秒 , 文件共:" + (a - 1) + "条数据"; } } // QueueUtil public class QueueUtil { /** *队列一:存放从文件中读取的id集合 */ public static final LinkedBlockingQueue
> QUEUE_IDS = new LinkedBlockingQueue
>(); /** * 队列二:存放组装数据过后的List
*/ public static final LinkedBlockingQueue
> QUEUE_JSON_OBJECTS = new LinkedBlockingQueue
>(); /** *队列三:存放所有失败id集合 */ public static final LinkedBlockingQueue
> QUEUE_FAIL_IDS = new LinkedBlockingQueue
>(); /** *队列四:存放所有校验过的集合 */ public static final LinkedBlockingQueue
QUEUE_SAVETOMYSQL = new LinkedBlockingQueue
(); /** *队列五:存放所有校验过的集合 */ public static final LinkedBlockingQueue
> QUEUE_SAVETOMONGO = new LinkedBlockingQueue
>(); /** *队列六:存放所有校验过的集合 */ public static final LinkedBlockingQueue
> QUEUE_SAVETOES = new LinkedBlockingQueue< Map
>();}复制代码
// PieceInfoDataMoveTask    protected ExecutorService createExecutorService() {        //创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM));        return executorService;    }        // AbstractTaskTemplate    private List runTaskForResult(ExecutorService executorService, List
> tasks) { //线程池执行任务 List
> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List
results = Lists.newArrayList(); for (Future
future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoDataMoveTask protected void writeLogFile(List results) { }复制代码

2.2.2 消费进件id队列(QUEUE_IDS)

//spring 配置文件加载bean 调用init() 方法    
/** * 启动线程 */ // PieceSearchManager private void init() { //组装JsonObject readQueueForAssembleJsonThread.start(); log.info("线程1启动完毕,正在读取ids队列!!!!"); //校验数据 readQueueForValidateThread.start(); log.info("线程2启动完毕,正在读取List
队列!!!!"); //入库数据 readQueueForSaveMysqlThread.start(); log.info("线程3启动完毕,正在读取saveToMysql队列!!!!"); //入库Mongo readQueueForSaveMongoThread.start(); log.info("线程4启动完毕,正在读取saveToMongo队列!!!!"); //入库es readQueueForSaveESThread.start(); log.info("线程5启动完毕,正在读取saveToES队列!!!!"); //写出失败日志 readQueueForWriteLogThread.start(); log.info("线程6启动完毕,正在读取失败ids队列!!!!"); } 复制代码
public class ReadQueueForAssembleJsonThread extends Thread {    @Autowired    PieceAssembleManager pieceAssembleManager;    @Override    public void run() {        //创建线程池        //任务数        int countThread = Integer.valueOf(new Config().getMaxAssembleJsonThreadNum());        ExecutorService executorService = Executors.newFixedThreadPool(countThread);        for (int i = 0; i < countThread; i++) {            MyTask task = new MyTask();            executorService.execute(task);        }        executorService.shutdown();    }    @Data    class MyTask implements Runnable {        @Override        public void run() {            while (true) {                try {                    //读取队列中的ids                    List
ids = QueueUtil.QUEUE_IDS.take(); if (null != ids) { long l1 = System.currentTimeMillis(); //组装数据 Map
pieceJson_map = pieceAssembleManager.getPieceJson(ids); //存入队列 QueueUtil.QUEUE_JSON_OBJECTS.put(pieceJson_map.get("success")); //存入失败ids队列 QueueUtil.QUEUE_FAIL_IDS.put(pieceJson_map.get("fail")); long l2 = System.currentTimeMillis(); log.info("组装JSON耗时:" + (l2 - l1)," ,队列大小: "+QueueUtil.QUEUE_IDS.size()); ids=null; pieceJson_map=null; } } catch (Exception e) { log.error("组装数据失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } }}复制代码

2.2.3 消费JsonObject队列(QUEUE_JSON_OBJECTS)

public class ReadQueueForValidateThread extends Thread {    @Autowired    PieceSearchManager pieceSearchManager;    @Override    public void run() {        //创建线程池        int countThread = Integer.valueOf(new Config().getMaxValidateThreadNum());        ExecutorService executorService = Executors.newFixedThreadPool(countThread);        List
jsonObjects = Lists.newArrayList(); while (true) { try { //从队列中读取List
List
poll = QueueUtil.QUEUE_JSON_OBJECTS.take(); if (null != poll) { //创建任务 for (int i = 1; i <= poll.size(); i++) { jsonObjects.add(poll.get(i - 1)); if (i % 100 == 0) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } } if (!jsonObjects.isEmpty()) { MyTask myTask = new MyTask(); myTask.setJsonObjects(jsonObjects); executorService.execute(myTask); jsonObjects = Lists.newArrayList(); } log.info("校验队列大小: "+QueueUtil.QUEUE_JSON_OBJECTS.size()); } } catch (Exception e) { e.printStackTrace(); } } } @Data class MyTask implements Runnable { private List
jsonObjects; @Override public void run() { long l = System.currentTimeMillis(); SaveToMysqlDTO dto = new SaveToMysqlDTO(); //mongo队列所需数据 List
toMongos = Lists.newArrayList(); //es队列所需数据 Map
id_PieceJsonDTO =Maps.newHashMap(); List
investigatePieceDTOS = Lists.newArrayList(); List
pieceDetailDTOS = Lists.newArrayList(); List
pieceContactDTOS = Lists.newArrayList(); //失败结果 List
failedJson = Lists.newArrayList(); try { for (JSONObject finalObj : jsonObjects) { Map
map = pieceSearchManager.validatePiece(finalObj); if (map.containsKey("successData")) { pieceSearchManager.savePiecesLast(map.get("successData"), investigatePieceDTOS, pieceContactDTOS, pieceDetailDTOS,id_PieceJsonDTO); //增加id(存入mongo使用) JsonObject jsonObject = JsonKit.parseJsonObject(finalObj.toString()); JsonElement appkey = JsonKit.getValueByPath(jsonObject, MongoPieceManagerImpl.PIECE_CODE_JSON_PATH); jsonObject.add(MongoPieceManagerImpl.MONGO_ID, appkey); toMongos.add(Document.parse(jsonObject.toString())); } else { failedJson.add(((Map) ((Map) map.get("failed").get("data")).get("applicant")).get("intpc_id").toString()); } map=null; } dto.setInvestigatePieceDTOS(investigatePieceDTOS); dto.setPieceContactDTOS(pieceContactDTOS); dto.setPieceDetailDTOS(pieceDetailDTOS); //存入失败队列 QueueUtil.QUEUE_FAIL_IDS.put(failedJson); //存入入库队列 QueueUtil.QUEUE_SAVETOMYSQL.put(dto); QueueUtil.QUEUE_SAVETOMONGO.put(toMongos); QueueUtil.QUEUE_SAVETOES.put(id_PieceJsonDTO); long l1 = System.currentTimeMillis(); log.info("校验耗时:" + (l1 - l)); dto=null; toMongos=null; id_PieceJsonDTO=null; failedJson=null; jsonObjects=null; } catch (Exception e) { log.error("校验失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } }}复制代码

2.2.4 消费保存到数据库数据队列(QUEUE_SAVETOMYSQL)

public class ReadQueueForSaveMysqlThread extends Thread {    @Autowired    PieceSearchManager pieceSearchManager;    @Override    public void run() {        //创建线程池        //任务数        int countThread = Integer.valueOf(new Config().getMaxSaveMysqlThreadNum());        ExecutorService executorService = Executors.newFixedThreadPool(countThread);        for (int i = 0; i < countThread; i++) {            MyTask task = new MyTask();            executorService.execute(task);        }        executorService.shutdown();    }    @Data    class MyTask implements Runnable {        @Override        public void run() {            while (true) {                try {                    //从队列中读取校验过后的数据                    SaveToMysqlDTO dto = QueueUtil.QUEUE_SAVETOMYSQL.take();                    if (null != dto) {                        //数据入库                        long l = System.currentTimeMillis();                        pieceSearchManager.savePieceToSql(dto);                        long l1 = System.currentTimeMillis();                        log.info("入库Mysql耗时:" + (l1 - l)+" , 队列大小: "+QueueUtil.QUEUE_SAVETOMYSQL.size());                    }                } catch (InterruptedException e) {                    log.error("插入Mysql失败,失败原因:  "+e.getMessage());                    e.printStackTrace();                }            }        }    }}复制代码

2.2.5 消费保存到mongoDB数据队列(QUEUE_SAVETOMONGO)

public class ReadQueueForSaveMongoThread extends Thread {    @Autowired    MongoUtil mongoUtil;    private MongoCollection mongoCollection;    @Override    public void run() {        //获取mongo 链接        mongoCollection = mongoUtil.mongoClient();        //创建线程池        //任务数        int countThread = Integer.valueOf(new Config().getMaxSaveMongoThreadNum());        ExecutorService executorService = Executors.newFixedThreadPool(countThread);        for (int i = 0; i < countThread; i++) {            MyTask task = new MyTask();            executorService.execute(task);        }        executorService.shutdown();    }    @Data    class MyTask implements Runnable {        @Override        public void run() {            while (true) {                try {                    //从队列中读取校验过后的数据                    List
documents = QueueUtil.QUEUE_SAVETOMONGO.take(); if (null != documents) { //数据入库 long l = System.currentTimeMillis(); mongoCollection.insertMany(documents); long l1 = System.currentTimeMillis(); log.info("Mongo耗时:" + (l1 - l)+", 队列大小: "+ QueueUtil.QUEUE_SAVETOMONGO.size()); } } catch (Exception e) { log.error("插入mongo失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } }}复制代码

2.2.6 消费保存到Elasticsearch数据队列(QUEUE_SAVETOES)

public class ReadQueueForSaveESThread extends Thread {    @Autowired    ElasticsearchManager elasticsearchManager;    @Override    public void run() {        //创建线程池        //任务数        int countThread = Integer.valueOf(new Config().getMaxSaveESThreadNum());        ExecutorService executorService = Executors.newFixedThreadPool(countThread);        for (int i = 0; i < countThread; i++) {            MyTask task = new MyTask();            executorService.execute(task);        }        executorService.shutdown();    }    @Data    class MyTask implements Runnable {        @Override        public void run() {            BulkProcessor bulkProcessor = elasticsearchManager.createBulkProcessor();            while (true) {                try {                    //从队列中读取校验过后的数据                    Map
map = QueueUtil.QUEUE_SAVETOES.take(); if (null != map) { for (Long id : map.keySet()) { //创建request blukprocessor PieceJsonDTO pieceJsonDTO = map.get(id); String s = JsonKit.toJsonSerializeNulls(pieceJsonDTO); bulkProcessor.add(elasticsearchManager.createPieceIndexRequest(Long.toString(id),s)); pieceJsonDTO=null; } map = null; log.info("es队列大小:"+QueueUtil.QUEUE_SAVETOES.size()); } } catch (Exception e) { log.error("插入es失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } } }}复制代码

2.2.7 消费失败队列保存到文件中(QUEUE_FAIL_IDS)

public class ReadQueueForWriteLogThread extends Thread {    /**     * 文件根路径     */    private final static String FILE_ROOT_PATH = new Config().getFileRootPath();    /**     * 项目路径     */    private final static String ABSOLUTE_PATH = FileKit.getAbsolutePath("");    @Override    public void run() {        //日志文件名称        String fileName = "datamove.log";        //文件路径        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;        while (true) {            try {                List
fail_ids = QueueUtil.QUEUE_FAIL_IDS.take(); if (null != fail_ids) { ArrayList
result = Lists.newArrayList(); String s = "失败个数:" + fail_ids.size() + " , 失败id: " + fail_ids; result.add(s); //按行写出 FileKit.appendLines(result, filePath, "UTF-8"); } } catch (Exception e) { log.error("写出失败ids失败,失败原因: "+e.getMessage()); e.printStackTrace(); } } }}复制代码

2.3 失败数据处理

@RequestMapping(value = "***", method = {RequestMethod.GET})    @ResponseBody    public void readLog() {        pieceSearchService.pieceLogFile();    }        // PieceSearchServiceImpl    public void pieceLogFile() {        dataMoveManager.startReadLogForDb();    }        // AbstractTemplate    public void startReadLogForDb() {        readLogFileForDb();    }        // DataMoveManager    protected void readLogFileForDb() {        pieceInfoLogFileTask.start();    }        // AbstractTaskTemplate    public void start() {        List
> task = createTask(); ExecutorService executorService = createExecutorService(); List list = runTaskForResult(executorService, task); writeLogFile(list); } // PieceInfoLogFileTask protected List
> createTask() { //获取 datamove.log路径 String fileName = "datamove.log"; String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行读取 获取文件中 失败id: List
ids = Lists.newArrayList(); try { ids = readLines(new File(filePath), "UTF-8", ids); } catch (IOException e) { e.printStackTrace(); } //创建任务组 if (!Func.isEmpty(ids)) { List
taskWithIds = getTaskWithIds(ids); return taskWithIds; } return null; } // PieceInfoLogFileTask private List
getTaskWithIds(List
ids) { //一百个ids 起一个任务 ArrayList
tasks = Lists.newArrayList(); int a = 1; int beginIndex = 0; while (true) { if (a * 100 >= ids.size()) { MyTask myTask = new MyTask(); myTask.setIds(ids.subList(beginIndex, ids.size())); tasks.add(myTask); break; } MyTask myTask = new MyTask(); //subList() 包左不包右 myTask.setIds(ids.subList(beginIndex, beginIndex + 100)); tasks.add(myTask); beginIndex += 100; a++; } return tasks; } private class MyTask implements Callable
{ private List
ids; @Override public String call() throws Exception { //每次保存的进件Id ArrayList
pieceIds = new ArrayList<>(); //开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < ids.size(); i++) { pieceIds.add(ids.get(i)); //每多少行 执行插入操作 if (i % MAX_SAVE_NUM == 0) { //执行查询保存操作 System.out.println(ids); pieceIds.clear(); } } if (!pieceIds.isEmpty()) { //执行查询保存操作 } long useTime = (System.currentTimeMillis() - beginTime) / 1000; return "日志文件失败进件重新入库完毕,耗时:" + useTime + "秒 , 文件共:" + ids.size() + "条数据, 失败个数: , 失败id:"; } } // PieceInfoLogFileTask protected ExecutorService createExecutorService() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM)); return executorService; } // AbstractTaskTemplate private List runTaskForResult(ExecutorService executorService, List
> tasks) { //线程池执行任务 List
> futures = new ArrayList<>(); try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //组装返回结果 List
results = Lists.newArrayList(); for (Future
future : futures) { try { results.add(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return results; } // PieceInfoLogFileTask protected void writeLogFile(List results) { //日志文件名称 String fileName = "datamove.log"; //文件路径 String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName; //按行写出 try { FileKit.writeLines(results, filePath, "UTF-8"); } catch (IOException e) { e.printStackTrace(); log.error("class:PieceInfoDataMoveTask method: writeLogFile()" + " 日志信息写出失败, 失败原因:" + e.getMessage()); } }复制代码

2.4 数据迁移流程图

3 Elasticsearch在综合搜索列表实现

public Page
pagination(final InvestigatePiecePageReqDTO investigatepiecePageReqDTO, final Integer pageNo, final Integer pageSize) { //创建es组合查询器 BoolQueryBuilder mustQuery = QueryBuilders.boolQuery(); //进件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceCode())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceCode.keyword", "*" + investigatepiecePageReqDTO.getPieceCode() + "*")); } //手机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getMobile())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.mobile.keyword", "*" + investigatepiecePageReqDTO.getMobile() + "*")); } //身份证号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getIdCard())) { mustQuery.must(QueryBuilders.wildcardQuery("idCard.keyword", "*" + investigatepiecePageReqDTO.getIdCard() + "*")); } //服务网点 if (Func.isNotEmpty(investigatepiecePageReqDTO.getServicePoint())) { mustQuery.must(QueryBuilders.wildcardQuery("servicePoint.keyword", "*" + investigatepiecePageReqDTO.getServicePoint() + "*")); } //座机号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPhone())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.phone.keyword", "*" + investigatepiecePageReqDTO.getPhone() + "*")); } //单位名称 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyName())) { mustQuery.must(QueryBuilders.wildcardQuery("companyName.keyword", "*" + investigatepiecePageReqDTO.getCompanyName() + "*")); } //工单编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getOrderCode())) { mustQuery.must(QueryBuilders.wildcardQuery("orderCode.keyword", "*" + investigatepiecePageReqDTO.getOrderCode() + "*")); } //案件编号 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCaseCode())) { mustQuery.must(QueryBuilders.wildcardQuery("caseCode.keyword", "*" + investigatepiecePageReqDTO.getCaseCode() + "*")); } //规则搜索 if (Func.isNotEmpty(investigatepiecePageReqDTO.getRuleCode())) { mustQuery.must(QueryBuilders.wildcardQuery("ruleName.keyword", "*" + investigatepiecePageReqDTO.getRuleCode() + "*")); } //客户经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerManager())) { mustQuery.must(QueryBuilders.wildcardQuery("customerManager.keyword", "*" + investigatepiecePageReqDTO.getCustomerManager() + "*")); } //进件大区 if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceFromArea())) { mustQuery.must(QueryBuilders.wildcardQuery("pieceFromArea.keyword", "*" + investigatepiecePageReqDTO.getPieceFromArea() + "*")); } //进件状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getBusinessStatus())) { mustQuery.must(QueryBuilders.termQuery("businessStatus.keyword", investigatepiecePageReqDTO.getBusinessStatus())); } //产品类型 if (Func.isNotEmpty(investigatepiecePageReqDTO.getProductCode())) { mustQuery.must(QueryBuilders.termQuery("productName.keyword", investigatepiecePageReqDTO.getProductCode())); } else { //默认查询当前用户权限下的所有产品类型 List
productCodes = getProductCodeByCurrentUser(); mustQuery.must(QueryBuilders.termsQuery("productName.keyword", productCodes)); } //反欺诈处理人 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionMaker())) { mustQuery.must(QueryBuilders.wildcardQuery("decisionMaker.keyword", "*" + investigatepiecePageReqDTO.getDecisionMaker() + "*")); } //单位地址 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyAddress())) { mustQuery.must(QueryBuilders.wildcardQuery("companyAddress.keyword", "*" + investigatepiecePageReqDTO.getCompanyAddress() + "*")); } //欺诈报警 if (Func.isNotEmpty(investigatepiecePageReqDTO.getFraudAlarmLevelCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.QZJGLB_MAP().get(investigatepiecePageReqDTO.getFraudAlarmLevelCode()); mustQuery.must(QueryBuilders.termQuery("fraudAlarmLevelName.keyword", dictionaryDTO.getName())); } //决策结果 if (Func.isNotEmpty(investigatepiecePageReqDTO.getResultCode())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCJG_MAP().get(investigatepiecePageReqDTO.getResultCode()); mustQuery.must(QueryBuilders.termQuery("decisionResult.keyword", dictionaryDTO.getName())); } //反欺诈处理状态 if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionStatus())) { DictionaryDTO dictionaryDTO = DictionaryKit.JCZT_MAP().get(investigatepiecePageReqDTO.getDecisionStatus()); mustQuery.must(QueryBuilders.termQuery("decisionStatus.keyword", dictionaryDTO.getName())); } //客服人员 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerService())) { mustQuery.must(QueryBuilders.wildcardQuery("serviceUserName.keyword", "*" + investigatepiecePageReqDTO.getCustomerService() + "*")); } //客户姓名 if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerName())) { mustQuery.must(QueryBuilders.wildcardQuery("customerName.keyword", "*" + investigatepiecePageReqDTO.getCustomerName() + "*")); } //团队经理 if (Func.isNotEmpty(investigatepiecePageReqDTO.getTeamManager())) { mustQuery.must(QueryBuilders.wildcardQuery("teamManager.keyword", "*" + investigatepiecePageReqDTO.getTeamManager() + "*")); } //进件时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //决策时间 try { if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime())); } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) { mustQuery.must(QueryBuilders.rangeQuery("decisionTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime())); } } catch (Exception e) { e.printStackTrace(); } //是否初始化 if (Boolean.valueOf(investigatepiecePageReqDTO.getIfInit())) { //第一次进入,设置时间为今天 mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(getTodayStartTime()).to(getTodayEndTime())); } //组装分页 排序条件 SearchSourceBuilder source = new SearchSourceBuilder(); source.query(mustQuery); //排序字段 if (Func.isNotEmpty(investigatepiecePageReqDTO.getSortChangeKey())) { SortOrder sortOrder = "asc".equals(investigatepiecePageReqDTO.getSortChangeWay()) ? SortOrder.ASC : SortOrder.DESC; if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_ENTER_CREDIT_TIME)) { source.sort("enterCreditTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_TIME)) { source.sort("decisionTime", sortOrder); } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_MAKER)) { source.sort("decisionMaker.keyword", sortOrder); } else { source.sort("enterCreditTime", sortOrder); } } else { source.sort("enterCreditTime", SortOrder.DESC); } //设置分页 注意:es分页默认从0页开始 source.from(pageNo - 1); source.size(pageSize); //查询结果 Page
page = getResponseFromEs(source); return page; }复制代码
private Page
getResponseFromEs(SearchSourceBuilder source) { //创建查询es请求 SearchHits hits; try { SearchResponse response = elasticsearchManager.getPieceResponseWithBuilder(source); hits = response.getHits(); } catch (Exception e) { throw new ServiceException(500, "es搜索引擎连接失败"); } //获取命中数据 SearchHit[] hitsHits = hits.getHits(); //总命中数 Long totalHits = hits.getTotalHits(); //组装前端返回DTO List
result = Lists.newArrayList(); for (SearchHit hitsHit : hitsHits) { InvestigatePiecePageResDTO investigatePiecePageResDTO = new InvestigatePiecePageResDTO(); Map
sourceAsMap = hitsHit.getSourceAsMap(); investigatePiecePageResDTO.setId(Long.valueOf(hitsHit.getId())); investigatePiecePageResDTO.setPieceCode((String) sourceAsMap.get("pieceCode")); investigatePiecePageResDTO.setAppKey((String) sourceAsMap.get("appKey")); Integer decisionId = (Integer) sourceAsMap.get("decisionId"); if (null != decisionId) { investigatePiecePageResDTO.setDecisionId(decisionId.longValue()); } List
orderCode = (List
) sourceAsMap.get("orderCode"); if (Func.isNotEmpty(orderCode)) { investigatePiecePageResDTO.setOrderCode(((List
) sourceAsMap.get("orderCode")).get(0)); } investigatePiecePageResDTO.setCustomerName((String) sourceAsMap.get("customerName")); investigatePiecePageResDTO.setIdCard((String) sourceAsMap.get("idCard")); investigatePiecePageResDTO.setProductName(DictionaryKit.CPLX_MAP().get(sourceAsMap.get("productName")).getName()); investigatePiecePageResDTO.setEnterCreditTime(SIMPLE_DATE_FORMAT.format(new Date((Long) sourceAsMap.get("enterCreditTime")))); investigatePiecePageResDTO.setBusinessStatus((String) sourceAsMap.get("businessStatus")); Long decisionTime = (Long) sourceAsMap.get("decisionTime"); if (null != decisionTime) { investigatePiecePageResDTO.setDecisionTime(SIMPLE_DATE_FORMAT.format(new Date(decisionTime))); } investigatePiecePageResDTO.setDecisionResult((String) sourceAsMap.get("decisionStatus")); investigatePiecePageResDTO.setFraudAlarmLevelName((String) sourceAsMap.get("fraudAlarmLevelName")); investigatePiecePageResDTO.setDecisionMaker((String) sourceAsMap.get("decisionMaker")); investigatePiecePageResDTO.setServicePoint((String) sourceAsMap.get("servicePoint")); String decisionStatus = (String) sourceAsMap.get("decisionStatus"); if ("已决策".equals(decisionStatus)) { investigatePiecePageResDTO.set_disabled(false); } else { investigatePiecePageResDTO.set_disabled(true); } result.add(investigatePiecePageResDTO); } //组装前端分页DTO Page
page = new Page<>(); page.setRecords(result); page.setTotal(totalHits.intValue()); return page; }复制代码

转载地址:http://ejrma.baihongyu.com/

你可能感兴趣的文章
1、集合 2、Iterator迭代器 3、增强for循环 4、泛型
查看>>
关于/var/run/docker.sock
查看>>
SCrapy爬虫大战京东商城
查看>>
用 JavaScript 实现链表操作 - 11 Alternating Split
查看>>
Laravel优秀扩展包整理
查看>>
日志分析之识别真假蜘蛛与处理办法
查看>>
太多脚本将会毁掉持续交付
查看>>
一地鸡毛 OR 绝地反击,2019年区块链发展指南
查看>>
卢森堡大学发布RepuCoin系统,可破解区块链51%攻击
查看>>
国内云计算厂商众生相:四大阵营十几家企业生存盘点
查看>>
细说Unicode(一) Unicode初认识
查看>>
Node.js有了新的管理者
查看>>
Java 20年:历史与未来
查看>>
彻底理解Javascript中的原型链与继承
查看>>
腾讯最大规模裁撤中层干部,让贤年轻人
查看>>
gRPC-Web发布,REST又要被干掉了?
查看>>
如何:强化 TCP/IP 堆栈安全
查看>>
Spring3 MVC中使用Swagger生成API文档
查看>>
FastCGI PHP on Windows Server 2003
查看>>
LimeSDR Getting Started Quickly | LimeSDR上手指南
查看>>