博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm TridentWindowManager的pendingTriggers
阅读量:5931 次
发布时间:2019-06-19

本文共 13412 字,大约阅读时间需要 44 分钟。

  hot3.png

本文主要研究一下storm TridentWindowManager的pendingTriggers

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {        boolean success = true;        try {            _bolt.finishBatch(tracked.info);            String stream = COORD_STREAM(tracked.info.batchGroup);            for(Integer task: tracked.condition.targetTasks) {                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));            }            if(tracked.delayedAck!=null) {                _collector.ack(tracked.delayedAck);                tracked.delayedAck = null;            }        } catch(FailedException e) {            failBatch(tracked, e);            success = false;        }        _batches.remove(tracked.info.batchId.getId());        return success;    }
  • 这里调用_bolt的finishBatch方法,这个_bolt有两个实现类,分别是TridentSpoutExecutor用于spout,一个是SubtopologyBolt用于普通的bolt

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

public void finishBatch(BatchInfo batchInfo) {        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {            p.finishBatch((ProcessorContext) batchInfo.state);        }    }
  • SubtopologyBolt.finishBatch调用了一系列TridentProcessor的finishBatch操作

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {        // add tuple to the batch state        Object state = processorContext.state[tridentContext.getStateIndex()];        ((List
) state).add(projection.create(tuple)); } public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List
tuples = (List
) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List
pendingTriggerIds = null; List
triggerKeys = new ArrayList<>(); Iterable
triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List
) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue
pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator
pendingTriggersIter = pendingTriggers.iterator(); List
values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List) resultValue)); } collector.setContext(null); }
  • WindowTridentProcessor所在的bolt,ack一个batch的所有tuple之后,会执行finishBatch操作
  • WindowTridentProcessor的execute,接收到一个tuple,堆积到processorContext.state
  • finishBatch的时候,从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples)
  • 之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues
  • 最后将triggerValues挨个构造TriggerInfo以及resultValue发送出去

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

public void addTuplesBatch(Object batchId, List
tuples) { LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); List
entries = new ArrayList<>(); for (int i = 0; i < tuples.size(); i++) { String key = keyOf(batchId); TridentTuple tridentTuple = tuples.get(i); entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields))); } // tuples should be available in store before they are added to window manager windowStore.putAll(entries); for (int i = 0; i < tuples.size(); i++) { String key = keyOf(batchId); TridentTuple tridentTuple = tuples.get(i); addToWindowManager(i, key, tridentTuple); } } private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) { TridentTuple actualTuple = null; if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) { actualTuple = tridentTuple; } currentCachedTuplesSize.incrementAndGet(); windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple)); }
  • StoreBasedTridentWindowManager的addTuplesBatch方法,将这批tuple放入到windowStore,然后挨个addToWindowManager添加到windowManager

WindowManager.add

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

private final ConcurrentLinkedQueue
> queue; /** * Add an event into the window, with {@link System#currentTimeMillis()} as * the tracking ts. * * @param event the event to add */ public void add(T event) { add(event, System.currentTimeMillis()); } /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T event, long ts) { add(new EventImpl
(event, ts)); } /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event
windowEvent) { // watermark events are not added to the queue. if (!windowEvent.isWatermark()) { queue.add(windowEvent); } else { LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp()); } track(windowEvent); compactWindow(); }
  • 添加tuple到ConcurrentLinkedQueue中

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/**     * The callback invoked by the trigger policy.     */    @Override    public boolean onTrigger() {        List
> windowEvents = null; List
expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List
events = new ArrayList<>(); List
newEvents = new ArrayList<>(); for (Event
event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }
  • onTrigger方法首先调用scanEvents方法获取windowEvents,之后区分为events及newEvents,然后回调windowLifecycleListener.onActivation(events, newEvents, expired)方法

WindowManager.scanEvents

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/**     * Scan events in the queue, using the expiration policy to check     * if the event should be evicted or not.     *     * @param fullScan if set, will scan the entire queue; if not set, will stop     *                 as soon as an event not satisfying the expiration policy is found     * @return the list of events to be processed as a part of the current window     */    private List
> scanEvents(boolean fullScan) { LOG.debug("Scan events, eviction policy {}", evictionPolicy); List
eventsToExpire = new ArrayList<>(); List
> eventsToProcess = new ArrayList<>(); try { lock.lock(); Iterator
> it = queue.iterator(); while (it.hasNext()) { Event
windowEvent = it.next(); Action action = evictionPolicy.evict(windowEvent); if (action == EXPIRE) { eventsToExpire.add(windowEvent.get()); it.remove(); } else if (!fullScan || action == STOP) { break; } else if (action == PROCESS) { eventsToProcess.add(windowEvent); } } expiredEvents.addAll(eventsToExpire); } finally { lock.unlock(); } eventsSinceLastExpiry.set(0); LOG.debug("[{}] events expired from window.", eventsToExpire.size()); if (!eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry"); windowLifecycleListener.onExpiry(eventsToExpire); } return eventsToProcess; }
  • scanEvents方法从ConcurrentLinkedQueue中获取event,然后判断是否过期,将其分为expiredEvents、eventsToProcess两类,返回eventsToProcess的events

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

/**     * Listener to reeive any activation/expiry of windowing events and take further action on them.     */    class TridentWindowLifeCycleListener implements WindowLifecycleListener
{ @Override public void onExpiry(List
expiredEvents) { LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } @Override public void onActivation(List
events, List
newEvents, List
expired) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); } } private void execAggregatorAndStoreResult(int currentTriggerId, List
tupleEvents) { List
resultTuples = getTridentTuples(tupleEvents); // run aggregator to compute the result AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); Object state = aggregator.init(currentTriggerId, collector); for (TridentTuple resultTuple : resultTuples) { aggregator.aggregate(state, resultTuple, collector); } aggregator.complete(state, collector); List
> resultantAggregatedValue = collector.values; ArrayList
entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); windowStore.putAll(entries); pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); }
  • onActivation方法调用了execAggregatorAndStoreResult,它会调用window的aggregator,然后将结果存到windowStore,同时将resultantAggregatedValue作为TriggerResult添加到pendingTriggers中

小结

  • WindowTridentProcessor所在的TridentBoltExecutor,它在接收到spout的tuple的时候,调用processor的execute方法,将tuple缓存到ProcessorContext中;一系列的processor的execute方法执行完之后,就ack该tuple
  • 当WindowTridentProcessor所在的TridentBoltExecutor对一个batch的所有tuple ack完之后,会触发checkFinish操作,然后执行finishBatch操作,而finishBatch操作会调用一系列TridentProcessor的finishBatch操作(比如WindowTridentProcessor -> ProjectedProcessor -> PartitionPersistProcessor -> EachProcessor -> AggregateProcessor)
  • WindowTridentProcessor.finishBatch从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples),将这批tuple放入到windowStore,然后添加到windowManager的ConcurrentLinkedQueue中;之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues,将triggerValues挨个构造TriggerInfo以及resultValue发送出去
  • 而WindowManager.onTrigger方法,在window操作时间窗口触发时被调用,它从windowManager的ConcurrentLinkedQueue中获取windowEvent,然后传递给TridentWindowLifeCycleListener.onActivation
  • TridentWindowLifeCycleListener.onActivation方法则会执行window的aggregator的init、aggregate、complete操作获取聚合结果resultantAggregatedValue,然后放入pendingTriggers,至此完成window trigger与WindowTridentProcessor的衔接

doc

转载于:https://my.oschina.net/go4it/blog/2875814

你可能感兴趣的文章
php学习笔记--序
查看>>
再次学习的回忆
查看>>
我的友情链接
查看>>
LINUX下网站维护命令
查看>>
F5+IIS7.5 SNAT日志记录真实源IP
查看>>
【 Visual C++】游戏开发笔记之二——最简单的DirectX,vc窗口的编写
查看>>
我的友情链接
查看>>
LDAP架构部署认证
查看>>
Linux网络抓包分析工具Tcpdump基础篇[参数说明]
查看>>
00_02启动tomcat时 一闪而过解决方法
查看>>
WSUS 客户端无法提示更新!
查看>>
在处理文件服务器上的文件时文件服务器性能下降并出现延迟
查看>>
shell 文本过滤
查看>>
几个有用的VBS
查看>>
线上nginx的一次“no live upstreams while connecting to upstream ”分析
查看>>
为什么开源中国APP4.0版的排版让我感觉有些乱
查看>>
Traffic Analysis of an SSL/TLS Session
查看>>
Debian 7.0 amd64安装ia32-libs
查看>>
ie浏览器被锁定成2345 1125.cc无法修改的解决办法
查看>>
递归查询,只能查询两级
查看>>