Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 | 1x 1x 1x 1x 1x 1x 1x 26x 26x 2x 2x 1x 1x 1x 1x 22x 22x 22x 2x 22x 22x 11x 11x 3x 3x 3x 3x 3x 1x 11x 9x 9x 9x 5x 5x 5x 4x 4x 4x 2x 2x 2x 4x 2x 7x 7x 7x 4x 4x 4x 4x 4x 4x 3x 3x 4x 4x 5x 5x 5x 5x 7x 32x 35x 7x 20x 50x 59x 50x 17x 17x 5x 5x 5x 6x 3x 6x 6x 6x 3x 6x 1x 1x 1x 1x 1x | #!/usr/bin/env node
/**
* @module Infrastructure/WorkflowOrchestration
* @category Infrastructure
*
* @title Workflow State Coordinator - Multi-Workflow Synchronization Engine
*
* @description
* **INTELLIGENCE OPERATIVE PERSPECTIVE**
*
* This module orchestrates coordination between three independent news generation
* workflows operating on different schedules, preventing wasted computational
* resources on duplicate article generation and maintaining editorial consistency.
* In intelligence operations, workflow state management prevents information
* redundancy and ensures efficient use of computational and editorial resources.
*
* **WORKFLOW ARCHITECTURE:**
* The platform operates three independent content generation workflows:
*
* 1. **Realtime Monitor (news-realtime-monitor.md)**
* Schedule: 2x daily (morning + afternoon)
* Content: Event-driven breaking news, voting updates, crisis response
* Intelligence value: Rapid notification of parliamentary surprises
* Latency: Real-time (5-15 minute response to events)
*
* 2. **Evening Analysis (news-evening-analysis.md)**
* Schedule: Daily at 17:00 (5 PM Swedish time)
* Content: Deep analytical synthesis, international context, forward assessment
* Intelligence value: End-of-day intelligence briefing format
* Latency: Structured analysis (1-2 hour research + writing)
*
* 3. **Article Generators (news-article-generator.md)**
* Schedule: Variable (triggered by content calendar or on-demand)
* Content: Committee reports, motions, propositions, week-ahead
* Intelligence value: Systematic coverage of all parliamentary products
* Latency: Scheduled batch processing (hourly to daily)
*
* **DEDUPLICATION FRAMEWORK:**
* The coordinator prevents duplicate article generation using similarity analysis:
*
* - **Similarity Threshold: 70%**
* Computes Levenshtein distance on article titles and keyword sets
* Articles >70% similar are considered duplicates
* Prevents wasted generation of already-covered topics
*
* - **Time-Window Filtering: 6 hours**
* Checks if similar article was generated in last 6 hours
* Allows coverage of same topic if sufficient time has passed
* Prevents rapid-fire duplicates while allowing topic revisits
*
* - **Topic-Based Tracking**
* Logs article topics (votes, bills, committees, etc.)
* Enables intelligent filtering at generation time
* Supports trending topic analysis
*
* **MCP QUERY CACHING:**
* To avoid redundant API calls to riksdag-regering MCP platform:
*
* - **Cache TTL: 2 hours**
* Stores results of expensive queries (voting patterns, full-text search)
* Reduces MCP server load during peak hours
* Ensures consistency across multiple workflow invocations
*
* - **Query Fingerprinting**
* Creates deterministic hash of MCP query parameters
* Enables cache hits even if queries structured differently
* Supports query normalization
*
* - **Staleness Handling**
* Fresh data (within 2 hours) used for analysis
* Older data triggers MCP refresh
* Prevents stale intelligence from being published
*
* **STATE MANAGEMENT:**
* Persistent state file (news/metadata/workflow-state.json) tracks:
* - Last workflow execution timestamp and results
* - Recently generated articles (content + timestamp)
* - MCP query cache with expiration times
* - Workflow coordination metadata
* - Running task list for cross-workflow visibility
*
* **OPERATIONAL WORKFLOW:**
* 1. Workflow begins: Load current state from persistent storage
* 2. Query Analysis: Check if similar article was recently generated
* 3. Cache Check: Retrieve cached MCP queries if available (<2hr old)
* 4. Generation: Create new article (or skip if duplicate)
* 5. State Update: Log article and update cache
* 6. Persistence: Write updated state for next workflow invocation
*
* **INCIDENT SCENARIOS:**
* - **Double-Generation**: Realtime Monitor and Article Generator both cover voting
* Solution: Similarity detection blocks duplicate, tracks in state
*
* - **Stale Analysis**: Evening Analysis uses MCP data from morning
* Solution: 2-hour cache expiration triggers fresh queries
*
* - **Missed Coverage**: Topic isn't covered by any workflow
* Solution: State logs enable gap analysis, manual workflow triggers
*
* - **Cache Corruption**: Stale query results cause analytical errors
* Solution: TTL-based expiration automatically refreshes
*
* **INTELLIGENCE APPLICATIONS:**
* - Prevents topic redundancy (editorial efficiency)
* - Ensures consistent coverage across workflows
* - Enables gap analysis (which topics are missed?)
* - Supports workflow optimization (timing, triggers)
* - Provides audit trail for editorial decisions
*
* **PERFORMANCE OPTIMIZATION:**
* - MCP cache reduces API calls by estimated 60-70%
* - Reduces computational load on MCP platform during peaks
* - Faster generation cycles (cache lookups faster than API calls)
* - Enables more frequent workflow execution
*
* **FAILURE MODES & RECOVERY:**
* - State file corruption: Graceful fallback to generation without deduplication
* - Cache miss during load: Automatic MCP refresh triggered
* - Timestamp drift: UTC normalization prevents timezone confusion
* - Concurrent workflow execution: Lock-based synchronization
*
* **SCALABILITY CONSIDERATIONS:**
* - State file size grows ~50KB per month (manageable)
* - Cache memory: ~5MB typical, scales with coverage breadth
* - Similarity computation: O(n) in articles, automated pruning at 180 days
* - MCP query cache: Automatic cleanup of expired entries
*
* **GDPR COMPLIANCE:**
* - Member mentions in articles tracked in state
* - Data retention policies enforced (180-day pruning)
* - Audit trail supports member rights requests
* - No personal data stored in cache beyond article references
*
* @osint Workflow Intelligence Analysis
* - Tracks which topics get covered and when
* - Identifies coordination patterns across workflows
* - Enables predictive analysis of future coverage
* - Supports investigation of coordination anomalies
*
* @risk Deduplication Accuracy
* - 70% similarity threshold prevents false positives
* - Enables legitimate retelling of same story (new angle)
* - Detects coordinated coverage (unusual pattern)
* - Monitors for suspicious generation patterns
*
* @gdpr Data Retention & Cleanup
* - Automatic pruning of state after 180 days
* - Member data retention tied to article dates
* - Supports right-to-be-forgotten implementations
* - Audit logging for regulatory compliance
*
* @security State Integrity
* - File permissions protect state from unauthorized modification
* - Checksums validate cache data integrity
* - Atomic writes prevent partial state corruption
* - Versioning enables rollback if needed
*
* @author Hack23 AB (Editorial Operations & Workflow Optimization)
* @license Apache-2.0
* @version 2.2.0
* @since 2024-10-15
* @see news/metadata/workflow-state.json (State Persistence)
* @see Issue #150 (Workflow Coordination Enhancement)
* @see docs/WORKFLOW_ARCHITECTURE.md (Complete Architecture)
*/
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import crypto from 'crypto';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const STATE_FILE = path.join(__dirname, '..', 'news', 'metadata', 'workflow-state.json');
const MCP_CACHE_TTL_SECONDS = 2 * 60 * 60; // 2 hours
const RECENT_ARTICLE_TTL_SECONDS = 6 * 60 * 60; // 6 hours
const SIMILARITY_THRESHOLD = 0.70; // 70% similarity triggers deduplication
/**
* Workflow State Coordinator
*/
export class WorkflowStateCoordinator {
constructor(stateFilePath = STATE_FILE) {
this.stateFilePath = stateFilePath;
this.state = {
lastUpdate: null,
recentArticles: [],
mcpQueryCache: {},
workflows: {}
};
}
/**
* Load state from disk
*/
async load() {
try {
if (fs.existsSync(this.stateFilePath)) {
const content = fs.readFileSync(this.stateFilePath, 'utf-8');
this.state = JSON.parse(content);
this.cleanupExpiredEntries();
} else {
// Initialize empty state
await this.save();
}
} catch (error) {
console.warn('Warning: Could not load workflow state:', error.message);
// Continue with empty state
}
}
/**
* Save state to disk
*/
async save() {
try {
const dir = path.dirname(this.stateFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
this.state.lastUpdate = new Date().toISOString();
fs.writeFileSync(this.stateFilePath, JSON.stringify(this.state, null, 2), 'utf-8');
} catch (error) {
console.error('Error saving workflow state:', error.message);
throw error;
}
}
/**
* Clean up expired cache entries and old articles
*/
cleanupExpiredEntries() {
const now = Date.now();
// Clean MCP cache using per-entry TTL (default: 2 hours)
Object.keys(this.state.mcpQueryCache).forEach(key => {
const entry = this.state.mcpQueryCache[key];
const entryTime = new Date(entry.timestamp).getTime();
// If timestamp is invalid (NaN), treat as expired and delete
Iif (isNaN(entryTime)) {
delete this.state.mcpQueryCache[key];
return;
}
const effectiveTtlSeconds =
typeof entry.ttl === 'number' && entry.ttl > 0
? entry.ttl
: MCP_CACHE_TTL_SECONDS;
if (now - entryTime > effectiveTtlSeconds * 1000) {
delete this.state.mcpQueryCache[key];
}
});
// Clean recent articles (6-hour TTL)
this.state.recentArticles = this.state.recentArticles.filter(article => {
const articleTime = new Date(article.timestamp).getTime();
// If timestamp is invalid (NaN), treat as expired and exclude
Iif (isNaN(articleTime)) {
return false;
}
return (now - articleTime) <= RECENT_ARTICLE_TTL_SECONDS * 1000;
});
}
/**
* Cache MCP query result
*
* @param {string} queryKey - Unique identifier for the query
* @param {any} result - Query result to cache
* @param {number} ttl - Time to live in seconds (default: 2 hours)
*/
async cacheMCPQuery(queryKey, result, ttl = MCP_CACHE_TTL_SECONDS) {
const resultHash = this.hashObject(result);
this.state.mcpQueryCache[queryKey] = {
timestamp: new Date().toISOString(),
ttl,
resultHash,
result
};
await this.save();
}
/**
* Get cached MCP query result
*
* @param {string} queryKey - Unique identifier for the query
* @returns {any|null} Cached result or null if expired/missing
*/
getCachedMCPQuery(queryKey) {
this.cleanupExpiredEntries();
const entry = this.state.mcpQueryCache[queryKey];
if (!entry) return null;
const now = Date.now();
const entryTime = new Date(entry.timestamp).getTime();
// Use per-entry TTL with fallback to default constant
const effectiveTtlSeconds =
typeof entry.ttl === 'number' && entry.ttl > 0
? entry.ttl
: MCP_CACHE_TTL_SECONDS;
Iif (now - entryTime > effectiveTtlSeconds * 1000) {
delete this.state.mcpQueryCache[queryKey];
return null;
}
return entry.result;
}
/**
* Add recent article to tracking
*
* @param {Object} article - Article metadata
*/
async addRecentArticle(article) {
const articleEntry = {
slug: article.slug,
timestamp: new Date().toISOString(),
workflow: article.workflow || 'unknown',
title: article.title,
topics: article.topics || [],
mcpQueries: article.mcpQueries || []
};
this.state.recentArticles.push(articleEntry);
await this.save();
}
/**
* Check if article is duplicate based on similarity
*
* @param {string} title - Article title
* @param {string[]} topics - Article topics
* @param {string[]} mcpQueries - MCP query keys used for this article
* @returns {Object} { isDuplicate: boolean, matchedArticle: Object|null, similarityScore: number }
*/
async checkDuplicateArticle(title, topics = [], mcpQueries = []) {
this.cleanupExpiredEntries();
let maxSimilarity = 0;
let matchedArticle = null;
for (const recentArticle of this.state.recentArticles) {
const similarity = this.calculateSimilarity(
title,
topics,
mcpQueries,
recentArticle.title,
recentArticle.topics,
recentArticle.mcpQueries
);
if (similarity > maxSimilarity) {
maxSimilarity = similarity;
matchedArticle = recentArticle;
}
}
const isDuplicate = maxSimilarity >= SIMILARITY_THRESHOLD;
return {
isDuplicate,
matchedArticle: isDuplicate ? matchedArticle : null,
similarityScore: maxSimilarity
};
}
/**
* Calculate similarity between two articles
*
* Uses weighted combination of:
* - Title similarity (50%)
* - Topic overlap (30%)
* - MCP query overlap (20%)
*
* @returns {number} Similarity score 0.0-1.0
*/
calculateSimilarity(title1, topics1, mcpQueries1, title2, topics2, mcpQueries2) {
const titleSim = this.stringSimilarity(title1, title2);
const topicSim = this.setOverlap(topics1, topics2);
const sourceSim = this.setOverlap(mcpQueries1, mcpQueries2);
return (titleSim * 0.5) + (topicSim * 0.3) + (sourceSim * 0.2);
}
/**
* Calculate string similarity using Jaccard similarity of word sets
*
* @param {string} str1 - First string
* @param {string} str2 - Second string
* @returns {number} Similarity 0.0-1.0
*/
stringSimilarity(str1, str2) {
Iif (!str1 || !str2) return 0;
const words1 = new Set(str1.toLowerCase().split(/\s+/).filter(w => w.length > 2));
const words2 = new Set(str2.toLowerCase().split(/\s+/).filter(w => w.length > 2));
return this.setOverlap([...words1], [...words2]);
}
/**
* Calculate set overlap (Jaccard similarity)
*
* @param {Array} set1 - First set
* @param {Array} set2 - Second set
* @returns {number} Overlap 0.0-1.0
*/
setOverlap(set1, set2) {
if (!set1 || !set2 || set1.length === 0 || set2.length === 0) return 0;
const s1 = new Set(set1.map(x => String(x).toLowerCase()));
const s2 = new Set(set2.map(x => String(x).toLowerCase()));
const intersection = new Set([...s1].filter(x => s2.has(x)));
const union = new Set([...s1, ...s2]);
return intersection.size / union.size;
}
/**
* Hash object for cache comparison
*
* @param {any} obj - Object to hash
* @returns {string} SHA-256 hash
*/
hashObject(obj) {
// Handle null/undefined and non-object inputs safely
// Only use Object.keys for non-null objects, otherwise let JSON.stringify
// use its default behavior
const replacer = (obj !== null && typeof obj === 'object' && !Array.isArray(obj))
? Object.keys(obj).sort()
: undefined;
const str = JSON.stringify(obj, replacer);
return crypto.createHash('sha256').update(str).digest('hex').substring(0, 16);
}
/**
* Record workflow execution
*
* @param {string} workflowName - Name of workflow
* @param {Object} metadata - Execution metadata
*/
async recordWorkflowExecution(workflowName, metadata = {}) {
if (!this.state.workflows[workflowName]) {
this.state.workflows[workflowName] = {
lastRun: null,
runCount: 0,
articlesGenerated: 0
};
}
this.state.workflows[workflowName].lastRun = new Date().toISOString();
this.state.workflows[workflowName].runCount++;
if (metadata.articlesGenerated) {
this.state.workflows[workflowName].articlesGenerated += metadata.articlesGenerated;
}
await this.save();
}
/**
* Get recent articles from last N hours
*
* @param {number} hours - Hours to look back
* @returns {Array} Recent articles
*/
getRecentArticles(hours = 6) {
this.cleanupExpiredEntries();
const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000);
return this.state.recentArticles.filter(article => {
return new Date(article.timestamp) >= cutoff;
});
}
/**
* Get workflow statistics
*
* @returns {Object} Statistics by workflow
*/
getWorkflowStatistics() {
return {
...this.state.workflows,
cacheSize: Object.keys(this.state.mcpQueryCache).length,
recentArticlesCount: this.state.recentArticles.length
};
}
}
// Export for direct usage
export {
MCP_CACHE_TTL_SECONDS,
RECENT_ARTICLE_TTL_SECONDS,
SIMILARITY_THRESHOLD
};
|