spring cloud gateway + nacos 实现 动态路由配置、秒级上下线
spring cloud gateway + nacos 实现 动态路由配置、秒级上下线 众所周知 阿里 的nacos 注册中服务的变更 是有变更通知的
有一个对象线程PushReceiver专门处理服务变更处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public void run () { while (true ) { try { byte [] buffer = new byte [UDP_MSS]; DatagramPacket packet = new DatagramPacket (buffer, buffer.length); udpSocket.receive(packet); String json = new String (IoUtils.tryDecompress(packet.getData()), "UTF-8" ).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JSON.parseObject(json, PushPacket.class); String ack; if ("dom" .equals(pushPacket.type) || "service" .equals(pushPacket.type)) { hostReactor.processServiceJSON(pushPacket.data); ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}" ; } else if ("dump" .equals(pushPacket.type)) { ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap())) + "\"}" ; } else { ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}" ; } udpSocket.send(new DatagramPacket (ack.getBytes(Charset.forName("UTF-8" )), ack.getBytes(Charset.forName("UTF-8" )).length, packet.getSocketAddress())); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while receiving push data" , e); } } }
在线程中收到nacos服务的返回后 携带获取到的信息调用HostReactor对象的processServiceJSON方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 public ServiceInfo processServiceJSON (String json) { ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { return oldService; } boolean changed = false ; if (oldService != null ) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap <String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap <String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet <Instance>(); Set<Instance> newHosts = new HashSet <Instance>(); Set<Instance> remvHosts = new HashSet <Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList <Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue ; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue ; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0 ) { changed = true ; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts)); } if (remvHosts.size() > 0 ) { changed = true ; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts)); } if (modHosts.size() > 0 ) { changed = true ; NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0 ) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true ; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON .toJSONString(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts())); } return serviceInfo; }
在该方法中进行简单的处理后打印服务变更的信息再调用EventDispatcher的serviceChanged方法 在这个方法中将改变的信息放入BlockingQueue队列中
而这个对象是存在一个特殊的内部类 他是一个线程类,专门处理队列中的信息 并通过调用监听对象实现通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private class Notifier implements Runnable { @Override public void run () { while (true ) { ServiceInfo serviceInfo = null ; try { serviceInfo = changedServices.poll(5 , TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null ) { continue ; } try { List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent (serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } }
到这里已经看完了整个的服务变更通知流程,也知道在哪里可以得到这个通知 并且被我们所用。
之前已经知道了nacos服务变更通知的流程,那么现在就是实现获取服务变更 并实现秒级上下线
通过源码回溯可以知道EventDispatcher对象是由NacosNamingService对象创建并管理,在往上 能知道这个对象又是由NacosDiscoveryProperties对象管理,而这个对象 就是nacos-config 的配置信息对象,那么他必然是由spring管理的,所以Resource 注入完事,
NacosDiscoveryProperties 管理的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public NamingService namingServiceInstance () { if (null != namingService) { return namingService; } try { namingService = NacosFactory.createNamingService(getNacosProperties()); } catch (Exception e) { log.error("create naming service error!properties={},e=," , this , e); return null ; } return namingService; }
按照上一篇 所看到的流程 ,我还需要一个实现EventListener接口的对象 去加入监听并处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component public class ServiceEventListener implements EventListener { @Resource private NamedContextFactory factory; @Override public void onEvent (Event event) { if (event instanceof NamingEvent){ factory.destroy(); } } }
当收到监听且是服务变更的监听对象 则直接销毁gateway 的服务表上下文,可以精确的更改,但是据我看到的方法应该只能反射修改内部的map
最后获取服务列表并添加监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Resource private NacosDiscoveryProperties discoveryProperties; @Resource private NacosServiceDiscovery serviceDiscovery; @Resource private ServiceEventListener eventListener; private volatile NamingService naming; protected static volatile List<String> services = new ArrayList <>(); protected volatile long time = 30000 ; @Autowired public void init () { try { naming = discoveryProperties.namingServiceInstance(); services = serviceDiscovery.getServices(); services.forEach(this ::addServiceListener); new Thread (()->{ for (;;){ try { Thread.sleep(time); List<String> newServices = serviceDiscovery.getServices(); for (String service : newServices){ if (!services.contains(service)){ services.add(service); addServiceListener(service); } } }catch (Exception e){ e.printStackTrace(); } } }).start(); }catch (Exception e){ e.printStackTrace(); } } public void addServiceListener (String serviceName) { try { naming.subscribe(serviceName, eventListener); }catch (Exception e){ e.printStackTrace(); } }
gateway的秒级的上下线就实现了