EurekaServer
是有缓存机制的;假设没有缓存机制,大家可想而知,一个集群下来 服务的注册、服务的信息拉取,都是一个非常高频的操作,而且需要保证数据安全就需要加锁,在写的时候就不能读了,这样性能是很低的。
EurekaServer
缓存采用 ResponseCache
实现,就 ResponseCacheImpl
一个实现类,ResponseCache
代码如下:
public interface ResponseCache {/*** 缓存失效*/void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);/*** 获取增量的版本号*/AtomicLong getVersionDelta();/*** 获取带有区域的版本Delta*/AtomicLong getVersionDeltaWithRegions();/*** 获取有关应用程序的缓存信息。*/String get(Key key);/*** gzip 的数据格式*/byte[] getGZIP(Key key);/*** 通过停止内部线程并取消注册伺服监视器来关闭此缓存。*/void stop();}
缓存分为两种,代码如下:
/*** 只读缓存*/private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();/*** 读写缓存*/private final LoadingCache<Key, Value> readWriteCacheMap;
说明:
代码如下:
@VisibleForTestingValue getValue(final Key key, boolean useReadOnlyCache) {Value payload = null;try {// <1> 是否使用缓存if (useReadOnlyCache) {// tip: readOnlyCacheMap 是一个只读缓存// <2> 从只读缓存中获取final Value currentPayload = readOnlyCacheMap.get(key);if (currentPayload != null) {payload = currentPayload;} else {// <2.1> 没有从 读写缓存获取// tip: 如果 readWriteCacheMap 缓存也没有怎么办呢?// tip: readWriteCacheMap 是 guava 的 LoadingCache 缓存,可以看创建LoadingCache地方,get 不到的策略处理payload = readWriteCacheMap.get(key);readOnlyCacheMap.put(key, payload);}} else {// <3> 读写缓存中获取payload = readWriteCacheMap.get(key);}} catch (Throwable t) {logger.error("Cannot get value for key : {}", key, t);}return payload;}
说明:
读写缓存实在,ResponseCacheImpl
创建的时候,构建的代码如下:
// ResponseCacheImpl// 构造方法// guava 的 LoadingCachethis.readWriteCacheMap =CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())// <1> 缓存过期时间,默认180秒.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)// <2> 删除后的监听.removalListener(new RemovalListener<Key, Value>() {@Overridepublic void onRemoval(RemovalNotification<Key, Value> notification) {Key removedKey = notification.getKey();if (removedKey.hasRegions()) {Key cloneWithNoRegions = removedKey.cloneWithoutRegions();regionSpecificKeys.remove(cloneWithNoRegions, removedKey);}}})// <3> CacheLoader 是缓存加载(就是 get 不到的时候,就会进入这里).build(new CacheLoader<Key, Value>() {@Overridepublic Value load(Key key) throws Exception {//if (key.hasRegions()) {Key cloneWithNoRegions = key.cloneWithoutRegions();regionSpecificKeys.put(cloneWithNoRegions, key);}// <3.2> 去加载新的数据Value value = generatePayload(key);return value;}});
说明:
serverConfig.getResponseCacheAutoExpirationInSeconds()
是一个缓存过期时间,默认 180 秒。// ResponseCacheImpl#generatePayload(Key key)// 略...boolean isRemoteRegionRequested = key.hasRegions();if (ALL_APPS.equals(key.getName())) {// <1> tip: ALL_APPS 全量获取if (isRemoteRegionRequested) {tracer = serializeAllAppsWithRemoteRegionTimer.start();// <1.1> 获取远程服务的payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));} else {tracer = serializeAllAppsTimer.start();// <1.2> 获取本地服务的// 1、registry.getApplications() 获取全部的 applicationspayload = getPayLoad(key, registry.getApplications());}} else if (ALL_APPS_DELTA.equals(key.getName())) {// <2> tip: ALL_APPS_DELTA 增量获取if (isRemoteRegionRequested) {tracer = serializeDeltaAppsWithRemoteRegionTimer.start();versionDeltaWithRegions.incrementAndGet();versionDeltaWithRegionsLegacy.incrementAndGet();payload = getPayLoad(key,registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));} else {tracer = serializeDeltaAppsTimer.start();// 增量计数 +1versionDelta.incrementAndGet();versionDeltaLegacy.incrementAndGet();// 1、registry.getApplicationDeltas() 从增量队列中获取 applications// 2、获取 valuepayload = getPayLoad(key, registry.getApplicationDeltas());}} else {tracer = serializeOneApptimer.start();payload = getPayLoad(key, registry.getApplication(key.getName()));}break;// 略...
说明:
// AbstractInstanceRegistrypublic Applications getApplications() {boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();if (disableTransparentFallback) {// <1> 本地获取 applicationsreturn getApplicationsFromLocalRegionOnly();} else {// <2> 云服务器的 eureka 集群中获取 applicationsreturn getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.}}
说明:
代码如下:
// ResponseCacheImplprivate String getPayLoad(Key key, Applications apps) {// tip: 将 Applications 转换成,xml 或者 json 格式// tip: 因为我们是有缓存的,所以同一个 apps 缓存,最多可以有2份,因为支持 xml 和 jsonEncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());String result;try {result = encoderWrapper.encode(apps);} catch (Exception e) {logger.error("Failed to encode the payload for all apps", e);return "";}if(logger.isDebugEnabled()) {logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());}return result;}
说明:
代码如下:
// AbstractInstanceRegistry@Deprecatedpublic Applications getApplicationDeltas() {// <1> 缓存未命中计数 +1GET_ALL_CACHE_MISS_DELTA.increment();// <2> Applications 应用信息Applications apps = new Applications();apps.setVersion(responseCache.getVersionDelta().get());Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();// <3> 获取写锁,服务注册的时候获取的是都锁,这个时候就不能注册了write.lock();try {// <4> recentlyChangedQueue 是一个最近修改的队列,默认保留3三分钟Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();logger.debug("The number of elements in the delta queue is : {}",this.recentlyChangedQueue.size());while (iter.hasNext()) {// 获取续约信息Lease<InstanceInfo> lease = iter.next().getLeaseInfo();InstanceInfo instanceInfo = lease.getHolder();logger.debug("The instance id {} is found with status {} and actiontype {}",instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());// tip: applicationInstancesMap 用于去重// applicationInstancesMap 如果为空的时候,才进行添加Application app = applicationInstancesMap.get(instanceInfo.getAppName());if (app == null) {app = new Application(instanceInfo.getAppName());applicationInstancesMap.put(instanceInfo.getAppName(), app);apps.addApplication(app);}// 添加实例信息app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));}// <5> 是否禁用失败回退(回退就会调用vip地址)boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();// <6> 没有禁用进入,然后调用vip注册的信息if (!disableTransparentFallback) {Applications allAppsInLocalRegion = getApplications(false);for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {// 远程注册中心,增量的信息Applications applications = remoteRegistry.getApplicationDeltas();for (Application application : applications.getRegisteredApplications()) {Application appInLocalRegistry =allAppsInLocalRegion.getRegisteredApplications(application.getName());if (appInLocalRegistry == null) {apps.addApplication(application);}}}}//Applications allApps = getApplications(!disableTransparentFallback);// <7> 生成 HashCodeapps.setAppsHashCode(allApps.getReconcileHashCode());return apps;} finally {// 释放锁write.unlock();}}
说明:
略... 和上面的 payload 一样的
完结~
EurekaServer 的 CacheKey 是什么样的?
EurekaServer 支持集中 response 格式?
获取同一个 Application 信息,第一次使用 application/json 获取,第二次采用 xml 获取,第二次会走缓存吗?
EurekaServer 有几级缓存?
讲讲 readOnlyCacheMap、readWriteCacheMap?
readWriteCacheMap 获取为空的时候,他是怎么加载数据的?
readWriteCacheMap 怎么更新缓存?
readOnlyCacheMap 怎么更新缓存?
Applications 容器默认版本号是从几开始?