1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| ThreadPoolExecutor realtimeTagRunner = new ThreadPoolExecutor(64, 200, 1, TimeUnit.HOURS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy() );
ThreadLocal<RealtimeTagCache> realtimeTagCacheThreadLocal = ThreadLocal.withInitial(() -> new RealtimeTagCache());
public boolean checkRealtimeRule(final Long userId, final String groupCode, final String ruleStr) {
GroupRealtimeRuleBO ruleBO = JSON.parseObject(ruleStr, GroupRealtimeRuleBO.class); if (ruleBO == null || ruleBO.getExpr() == null) { log.warn("groupCode:{}, check realtime rule failed. ruleStr:{}", groupCode, ruleStr); return false; } ExprNode expr = ruleBO.getExpr(); Set<String> tags = expr.extractVals(); if (tags == null || tags.isEmpty()) { return false; } RealtimeTagCache realtimeTagCache = realtimeTagCacheThreadLocal.get(); Map<String, Object> params = new HashMap<>(); List<String> runTags = new ArrayList<>(); for (String tag : tags) { Object ret = realtimeTagCache.get(userId, tag); if (ret == null) { runTags.add(tag); continue; } params.put(tag, ret); }
long start = System.currentTimeMillis(); if (CollectionUtils.isNotEmpty(runTags)) { Map<String, Future<Object>> runResultFutureMap = new HashMap<>(); for (String runTag : runTags) { AbstractRealtimeTag realtimeTag = realtimeTagManager.get(runTag); if (realtimeTag == null) { log.warn("realtimeTag impl is null, tagname:{}", runTag); continue; } Future<Object> submit = realtimeTagRunner.submit(() -> realtimeTag.run(userId, Collections.emptyMap())); runResultFutureMap.put(runTag, submit); } for (Map.Entry<String, Future<Object>> entry : runResultFutureMap.entrySet()) { String tagName = entry.getKey(); Future<Object> future = entry.getValue(); try { Object o = future.get(3, TimeUnit.SECONDS); params.put(tagName, o); realtimeTagCache.put(userId, tagName, o); } catch (Exception e) { log.error("realtimeTagRunner error, tagName:{}, userId:{}", tagName, userId, e); return false; } } } long end = System.currentTimeMillis(); String returnExpr = expr.toReturnExpr(); Boolean check = ExprUtl.check(returnExpr, params);
if (loggerConfig.isLogEnable("realtime-rule-debug")) { log.info("realtime rule check, userId:{}, group:{}, rule:{}, param:{}, result:{} , cost:{} ms", userId, groupCode, expr.toString(), JSON.toJSONString(params), check, end - start); }
return BooleanUtils.isTrue(check); }
|