[Java] java nginx监控服务程序调度算法实现 →→→→→进入此内容的聊天室

来自 , 2021-03-22, 写在 Java, 查看 108 次.
URL http://www.code666.cn/view/d360a502
  1. package com.wole.monitor;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Queue;
  8. import java.util.Set;
  9. import java.util.concurrent.Callable;
  10. import java.util.concurrent.CompletionService;
  11. import java.util.concurrent.ConcurrentHashMap;
  12. import java.util.concurrent.Executor;
  13. import java.util.concurrent.ExecutorCompletionService;
  14. import java.util.concurrent.Future;
  15. import java.util.concurrent.LinkedBlockingQueue;
  16. import java.util.concurrent.PriorityBlockingQueue;
  17. import java.util.concurrent.SynchronousQueue;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. import java.util.concurrent.TimeUnit;
  20. import java.util.concurrent.atomic.AtomicBoolean;
  21. import java.util.concurrent.atomic.AtomicLong;
  22.  
  23. import org.eclipse.jetty.util.ConcurrentHashSet;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.context.ApplicationContext;
  27. import org.springframework.context.support.ClassPathXmlApplicationContext;
  28.  
  29. import com.wole.monitor.dao.ServiceDao;
  30. import com.wole.servicemonitor.util.ServiceUtils;
  31.  
  32. /**
  33.  * 管理并调度某一个服务数据源的监控池
  34.  * @author yzygenuine
  35.  *
  36.  */
  37. public class MonitorsManage {
  38.         private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);
  39.  
  40.         private ServiceDao dao;
  41.  
  42.         /**
  43.          * 执行的一个并发池
  44.          */
  45.         private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
  46.                         new SynchronousQueue<Runnable>());
  47.  
  48.         /**
  49.          *
  50.          */
  51.         private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);
  52.  
  53.         /**
  54.          * 正在执行中的MonitorService集合
  55.          */
  56.         private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();
  57.  
  58.         /**
  59.          * 等待优先级队列
  60.          */
  61.         private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();
  62.  
  63.         /**
  64.          * 执行队列
  65.          */
  66.         private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();
  67.  
  68.         /**
  69.          * 是否关闭
  70.          */
  71.         private AtomicBoolean isClose = new AtomicBoolean(false);
  72.  
  73.         /**
  74.          * 生产者启动时间
  75.          */
  76.         private AtomicLong startTime = new AtomicLong(0);
  77.         /**
  78.          * 相对于启动的间隔时间
  79.          */
  80.         private AtomicLong intervalTime = new AtomicLong(0);
  81.  
  82.         public void close() {
  83.                 logger.info("closing................");
  84.                 isClose.compareAndSet(false, true);
  85.         }
  86.  
  87.        
  88.  
  89.         public void init() {
  90.                 logger.info("初始化");
  91.  
  92.         }
  93.  
  94.         public void work() {
  95.                 logger.info("开始工作");
  96.                 // 生产者启动工作
  97.  
  98.                 Thread productThread = new Thread(new ProductMonitor(1000));
  99.                 // 消费者启动工作
  100.                 Thread consumerThread = new Thread(new ConsumerMonitor(1000));
  101.                 // 回收者启动工作
  102.                 Thread recoverThread = new Thread(new RecoverMonitor(1000));
  103.  
  104.                 // 启动定时加载数据工作
  105.                 Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
  106.                 productThread.start();
  107.                 consumerThread.start();
  108.                 recoverThread.start();
  109.                 refreshThread.start();
  110.  
  111.         }
  112.  
  113.         /**
  114.          * 生产者
  115.          *
  116.          * @author yzygenuine
  117.          *
  118.          */
  119.         class ProductMonitor implements Runnable {
  120.                 long sleepTime = 1000;
  121.  
  122.                 public ProductMonitor(long sleepTime) {
  123.                         this.sleepTime = sleepTime;
  124.                 }
  125.  
  126.                 @Override
  127.                 public void run() {
  128.                         logger.info("生产者开启工作");
  129.                         // 开始进行定时监控
  130.                         long now = System.currentTimeMillis();
  131.                         long lastTime = now;
  132.                         startTime.addAndGet(now);
  133.                         try {
  134.                                 do {
  135.                                         Thread.sleep(sleepTime);
  136.                                         logger.debug("生产者休息{}ms", sleepTime);
  137.                                         now = System.currentTimeMillis();
  138.                                         intervalTime.addAndGet(now - lastTime);
  139.                                         while (sleepQueue.size() > 0) {
  140.                                                 MonitorService service = sleepQueue.peek();
  141.                                                 if (service.getCurrentTime() - intervalTime.get() < 1) {
  142.                                                         service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列
  143.                                                         if (!currentSet.contains(service)) {
  144.                                                                 logger.info("service {} 已被删除,不加入执行队列了", service.toString());
  145.                                                                 continue;
  146.                                                         }
  147.                                                         executeQueue.add(service);
  148.                                                 } else {
  149.                                                         logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());
  150.                                                         break;
  151.                                                 }
  152.                                         }
  153.  
  154.                                         if (sleepQueue.size() <= 0) {
  155.                                                 logger.debug("生产队列为空");
  156.                                         }
  157.                                         lastTime = now;
  158.                                 } while (!isClose.get());
  159.                         } catch (Exception e) {
  160.                                 logger.error("", e);
  161.                         }
  162.  
  163.                 }
  164.  
  165.         }
  166.  
  167.         /**
  168.          * 消费者
  169.          *
  170.          * @author yzygenuine
  171.          *
  172.          */
  173.         class ConsumerMonitor implements Runnable {
  174.                 long sleepTime = 1000;
  175.  
  176.                 public ConsumerMonitor(long sleepTime) {
  177.                         this.sleepTime = sleepTime;
  178.                         if (sleepTime < 1000) {
  179.                                 throw new RuntimeException("请配置sleepTime值大一些");
  180.                         }
  181.                 }
  182.  
  183.                 @Override
  184.                 public void run() {
  185.                         logger.info("消费者开启工作");
  186.                         try {
  187.                                 do {
  188.                                         Thread.sleep(sleepTime);
  189.                                         logger.debug("消费者休息{}ms", sleepTime);
  190.                                         while (executeQueue.size() > 0) {
  191.                                                 final MonitorService service = executeQueue.poll();
  192.                                                 completionService.submit(new ExecuteCallable(service));
  193.                                         }
  194.                                         logger.debug("消费队列为空");
  195.                                 } while (!isClose.get());
  196.                         } catch (Exception e) {
  197.                                 logger.error("", e);
  198.                         }
  199.                 }
  200.  
  201.         }
  202.  
  203.         /**
  204.          * 执行回调类
  205.          *
  206.          * @author yzygenuine
  207.          *
  208.          */
  209.         class ExecuteCallable implements Callable<Response> {
  210.                 final MonitorService service;
  211.  
  212.                 public ExecuteCallable(MonitorService service) {
  213.                         this.service = service;
  214.                 }
  215.  
  216.                 @Override
  217.                 public Response call() throws Exception {
  218.                         logger.debug("执行");
  219.                         Map<String, String> r = new HashMap<String, String>();
  220.                         Response response = new Response();
  221.                         response.service = service;
  222.                         response.response = r;
  223.                         Monitor m = MonitorFactory.getMonitor(service);
  224.                         response.isNeedWarn = m.isNeedWarnging(service, r);
  225.                         if (response.isNeedWarn) {
  226.                                 response.isSucToNotify = m.sendNotify(service, r);
  227.                         }
  228.                         return response;
  229.                 }
  230.  
  231.         }
  232.  
  233.         /**
  234.          * 回收者
  235.          *
  236.          * @author yzygenuine
  237.          *
  238.          */
  239.         class RecoverMonitor implements Runnable {
  240.                 private long sleepTime = 1000;
  241.  
  242.                 private long count = 0;
  243.  
  244.                 public RecoverMonitor(long sleepTime) {
  245.                         this.sleepTime = sleepTime;
  246.                         if (sleepTime < 1000) {
  247.                                 throw new RuntimeException("请配置sleepTime值大一些");
  248.                         }
  249.                 }
  250.  
  251.                 @Override
  252.                 public void run() {
  253.                         logger.info("回收者开启工作");
  254.                         try {
  255.                                 do {
  256.                                         // Thread.sleep(sleepTime);
  257.                                         Future<Response> response = completionService.take();
  258.                                         // 重置后进入休眠队列
  259.                                         MonitorService s = response.get().service;
  260.                                         if (!currentSet.contains(s)) {
  261.                                                 logger.info("service {} 已被删除,不回收了", s.toString());
  262.                                                 continue;
  263.                                         }
  264.                                         // 当前程序已运动的时间+相对间隔时间=绝对的间隔时间
  265.                                         s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
  266.                                         sleepQueue.add(s);
  267.                                         count++;
  268.                                         logger.info("回收,当前回收数量:" + count);
  269.                                 } while (!isClose.get());
  270.                         } catch (Exception e) {
  271.                                 logger.error("", e);
  272.                         }
  273.                 }
  274.         }
  275.  
  276.         /**
  277.          * 加载新的数据
  278.          *
  279.          * @author yzygenuine
  280.          *
  281.          */
  282.         class RefreshMonitorService implements Runnable {
  283.                 private long sleepTime = 1000;
  284.                 private ServiceDao dao;
  285.  
  286.                 public RefreshMonitorService(long sleepTime, ServiceDao dao) {
  287.                         this.sleepTime = sleepTime;
  288.                         if (sleepTime < 60000) {
  289.                                 logger.warn("刷新加载数据的间隔时间不能太短");
  290.                                 throw new RuntimeException("刷新加载数据的间隔时间不能太短");
  291.                         }
  292.                         this.dao = dao;
  293.                 }
  294.  
  295.                 private void firstLoad() {
  296.                         List<MonitorService> monitorService = dao.getService();
  297.                         logger.info("加载记录:" + monitorService.size());
  298.  
  299.                         // 将被监控服务加入优先级队列里
  300.                         for (int j = 0; j < monitorService.size(); j++) {
  301.                                 MonitorService service = monitorService.get(j);
  302.                                 // 初始化好时间
  303.                                 service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
  304.                                 currentSet.add(service);
  305.                                 sleepQueue.add(service);
  306.                         }
  307.  
  308.                 }
  309.  
  310.                 @Override
  311.                 public void run() {
  312.                         logger.info("读取新的service开启工作");
  313.                         firstLoad();
  314.                         try {
  315.                                 do {
  316.                                         logger.info("定时加载新的数据监听者休息{}ms", sleepTime);
  317.                                         Thread.sleep(sleepTime);
  318.                                         logger.info("##########开始执行更新数据############");
  319.                                         // 加载新的所有所数据 ,与当前的数据比较
  320.                                         List<MonitorService> deleteList = dao.deleteService();
  321.                                         List<MonitorService> addList = dao.incrementalService();
  322.                                         logger.info("删除旧的数据共:{}", deleteList.size());
  323.                                         currentSet.removeAll(deleteList);
  324.                                         logger.info("增加新的数据共:{}", addList.size());
  325.                                         currentSet.addAll(addList);
  326.                                         logger.info("更新后的currentSet size:{}", currentSet.size());
  327.  
  328.                                         for (MonitorService service : addList) {
  329.                                                 // 初始化绝对间隔时间
  330.                                                 service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
  331.                                                 sleepQueue.add(service);
  332.                                         }
  333.                                         logger.info("########这一轮更新结束");
  334.                                 } while (!isClose.get());
  335.                         } catch (Exception e) {
  336.                                 logger.error("", e);
  337.                         }
  338.                 }
  339.         }
  340.  
  341.         /**
  342.          * 响应的封装类
  343.          *
  344.          * @author yzygenuine
  345.          *
  346.          */
  347.         class Response {
  348.                 public Map<String, String> response;
  349.                 public MonitorService service;
  350.                 public boolean isNeedWarn;
  351.                 public boolean isSucToNotify;
  352.  
  353.         }
  354.  
  355.         public void setDao(ServiceDao dao) {
  356.                 this.dao = dao;
  357.         }
  358.  
  359. }
  360.  
  361. //java/5889

回复 "java nginx监控服务程序调度算法实现"

这儿你可以回复上面这条便签

captcha