public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List ) state).add(projection.create(tuple)); } public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List tuples = (List ) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List pendingTriggerIds = null; List triggerKeys = new ArrayList<>(); Iterable
private final ConcurrentLinkedQueue > queue; /** * Add an event into the window, with {@link System#currentTimeMillis()} as * the tracking ts. * * @param event the event to add */ public void add(T event) { add(event, System.currentTimeMillis()); } /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T event, long ts) { add(new EventImpl (event, ts)); } /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event windowEvent) { // watermark events are not added to the queue. if (!windowEvent.isWatermark()) { queue.add(windowEvent); } else { LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp()); } track(windowEvent); compactWindow(); }
/** * The callback invoked by the trigger policy. */ @Override public boolean onTrigger() { List > windowEvents = null; List expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List events = new ArrayList<>(); List newEvents = new ArrayList<>(); for (Event event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }
/** * Scan events in the queue, using the expiration policy to check * if the event should be evicted or not. * * @param fullScan if set, will scan the entire queue; if not set, will stop * as soon as an event not satisfying the expiration policy is found * @return the list of events to be processed as a part of the current window */ private List > scanEvents(boolean fullScan) { LOG.debug("Scan events, eviction policy {}", evictionPolicy); List eventsToExpire = new ArrayList<>(); List > eventsToProcess = new ArrayList<>(); try { lock.lock(); Iterator > it = queue.iterator(); while (it.hasNext()) { Event windowEvent = it.next(); Action action = evictionPolicy.evict(windowEvent); if (action == EXPIRE) { eventsToExpire.add(windowEvent.get()); it.remove(); } else if (!fullScan || action == STOP) { break; } else if (action == PROCESS) { eventsToProcess.add(windowEvent); } } expiredEvents.addAll(eventsToExpire); } finally { lock.unlock(); } eventsSinceLastExpiry.set(0); LOG.debug("[{}] events expired from window.", eventsToExpire.size()); if (!eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry"); windowLifecycleListener.onExpiry(eventsToExpire); } return eventsToProcess; }
/** * Listener to reeive any activation/expiry of windowing events and take further action on them. */ class TridentWindowLifeCycleListener implements WindowLifecycleListener { @Override public void onExpiry(List expiredEvents) { LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } @Override public void onActivation(List events, List newEvents, List expired) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); } } private void execAggregatorAndStoreResult(int currentTriggerId, List tupleEvents) { List resultTuples = getTridentTuples(tupleEvents); // run aggregator to compute the result AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); Object state = aggregator.init(currentTriggerId, collector); for (TridentTuple resultTuple : resultTuples) { aggregator.aggregate(state, resultTuple, collector); } aggregator.complete(state, collector); List > resultantAggregatedValue = collector.values; ArrayList entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); windowStore.putAll(entries); pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); }