Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 9x 1x 44x 45x 44x 21x 21x 12x 12x 12x 12x 28x 28x 37x 1x 36x 2x 37x 37x 22x 22x 22x 24x 24x 10x 23x 23x 23x 23x 7x 7x 1x 1x 1x 1x 1x 6x 6x 6x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 5x 3x 3x 1x 1x 1x 1x 1x 1x 1x 1x 6x 3x 3x 3x 5x 5x 7x 7x 7x 6x 1x 8x 8x 8x 6x 6x 7x 7x 7x 7x 7x 6x 6x 6x 6x 6x 6x 6x 6x 4x 4x 1x 1x 1x 1x 1x 6x 59x 59x 4x 4x 3x 3x 3x 3x 3x 1x 44x 44x 44x 2x 44x 44x 44x 44x 44x 31x 31x 3x 3x 3x 3x 3x 3x 1x 31x 19x 19x 19x 31x 31x 4x 4x 4x 6x 6x 6x 6x 4x 4x 4x 2x 2x 2x 4x 2x 17x 17x 17x 12x 12x 12x 12x 12x 12x 12x 12x 13x 13x 13x 13x 13x 13x 13x 10x 10x 13x 11x 13x 10x 10x 10x 12x 6x 2x 10x 14x 14x 14x 14x 16x 72x 81x 16x 47x 111x 125x 111x 40x 40x 6x 6x 6x 6x 3x 6x 6x 6x 6x 3x 6x 9x 9x 9x 3x 9x 1x 8x 8x 2x 2x 3x 2x 6x 2x 2x 2x 2x 1x | #!/usr/bin/env node
/**
* @module Infrastructure/WorkflowOrchestration
* @category Infrastructure
*
* @title Workflow State Coordinator - Multi-Workflow Synchronization Engine
*
* @description
* **INTELLIGENCE OPERATIVE PERSPECTIVE**
*
* This module orchestrates coordination between three independent news generation
* workflows operating on different schedules, preventing wasted computational
* resources on duplicate article generation and maintaining editorial consistency.
* In intelligence operations, workflow state management prevents information
* redundancy and ensures efficient use of computational and editorial resources.
*
* **WORKFLOW ARCHITECTURE:**
* The platform operates three independent content generation workflows:
*
* 1. **Realtime Monitor (news-realtime-monitor.md)**
* Schedule: 2x daily (morning + afternoon)
* Content: Event-driven breaking news, voting updates, crisis response
* Intelligence value: Rapid notification of parliamentary surprises
* Latency: Real-time (5-15 minute response to events)
*
* 2. **Evening Analysis (news-evening-analysis.md)**
* Schedule: Daily at 17:00 (5 PM Swedish time)
* Content: Deep analytical synthesis, international context, forward assessment
* Intelligence value: End-of-day intelligence briefing format
* Latency: Structured analysis (1-2 hour research + writing)
*
* 3. **Article Generators (news-article-generator.md)**
* Schedule: Variable (triggered by content calendar or on-demand)
* Content: Committee reports, motions, propositions, week-ahead
* Intelligence value: Systematic coverage of all parliamentary products
* Latency: Scheduled batch processing (hourly to daily)
*
* **DEDUPLICATION FRAMEWORK:**
* The coordinator prevents duplicate article generation using similarity analysis:
*
* - **Similarity Threshold: 70%**
* Computes Levenshtein distance on article titles and keyword sets
* Articles >70% similar are considered duplicates
* Prevents wasted generation of already-covered topics
*
* - **Time-Window Filtering: 6 hours**
* Checks if similar article was generated in last 6 hours
* Allows coverage of same topic if sufficient time has passed
* Prevents rapid-fire duplicates while allowing topic revisits
*
* - **Topic-Based Tracking**
* Logs article topics (votes, bills, committees, etc.)
* Enables intelligent filtering at generation time
* Supports trending topic analysis
*
* **MCP QUERY CACHING:**
* To avoid redundant API calls to riksdag-regering MCP platform:
*
* - **Cache TTL: 2 hours**
* Stores results of expensive queries (voting patterns, full-text search)
* Reduces MCP server load during peak hours
* Ensures consistency across multiple workflow invocations
*
* - **Query Fingerprinting**
* Creates deterministic hash of MCP query parameters
* Enables cache hits even if queries structured differently
* Supports query normalization
*
* - **Staleness Handling**
* Fresh data (within 2 hours) used for analysis
* Older data triggers MCP refresh
* Prevents stale intelligence from being published
*
* **STATE MANAGEMENT:**
* Persistent state file (news/metadata/workflow-state.json) tracks:
* - Last workflow execution timestamp and results
* - Recently generated articles (content + timestamp)
* - MCP query cache with expiration times
* - Workflow coordination metadata
* - Running task list for cross-workflow visibility
*
* **OPERATIONAL WORKFLOW:**
* 1. Workflow begins: Load current state from persistent storage
* 2. Query Analysis: Check if similar article was recently generated
* 3. Cache Check: Retrieve cached MCP queries if available (<2hr old)
* 4. Generation: Create new article (or skip if duplicate)
* 5. State Update: Log article and update cache
* 6. Persistence: Write updated state for next workflow invocation
*
* **INCIDENT SCENARIOS:**
* - **Double-Generation**: Realtime Monitor and Article Generator both cover voting
* Solution: Similarity detection blocks duplicate, tracks in state
*
* - **Stale Analysis**: Evening Analysis uses MCP data from morning
* Solution: 2-hour cache expiration triggers fresh queries
*
* - **Missed Coverage**: Topic isn't covered by any workflow
* Solution: State logs enable gap analysis, manual workflow triggers
*
* - **Cache Corruption**: Stale query results cause analytical errors
* Solution: TTL-based expiration automatically refreshes
*
* **INTELLIGENCE APPLICATIONS:**
* - Prevents topic redundancy (editorial efficiency)
* - Ensures consistent coverage across workflows
* - Enables gap analysis (which topics are missed?)
* - Supports workflow optimization (timing, triggers)
* - Provides audit trail for editorial decisions
*
* **PERFORMANCE OPTIMIZATION:**
* - MCP cache reduces API calls by estimated 60-70%
* - Reduces computational load on MCP platform during peaks
* - Faster generation cycles (cache lookups faster than API calls)
* - Enables more frequent workflow execution
*
* **FAILURE MODES & RECOVERY:**
* - State file corruption: Graceful fallback to generation without deduplication
* - Cache miss during load: Automatic MCP refresh triggered
* - Timestamp drift: UTC normalization prevents timezone confusion
* - Concurrent workflow execution: Lock-based synchronization
*
* **SCALABILITY CONSIDERATIONS:**
* - State file size grows ~50KB per month (manageable)
* - Cache memory: ~5MB typical, scales with coverage breadth
* - Similarity computation: O(n) in articles, automated pruning at 180 days
* - MCP query cache: Automatic cleanup of expired entries
*
* **GDPR COMPLIANCE:**
* - Member mentions in articles tracked in state
* - Data retention policies enforced (180-day pruning)
* - Audit trail supports member rights requests
* - No personal data stored in cache beyond article references
*
* @osint Workflow Intelligence Analysis
* - Tracks which topics get covered and when
* - Identifies coordination patterns across workflows
* - Enables predictive analysis of future coverage
* - Supports investigation of coordination anomalies
*
* @risk Deduplication Accuracy
* - 70% similarity threshold prevents false positives
* - Enables legitimate retelling of same story (new angle)
* - Detects coordinated coverage (unusual pattern)
* - Monitors for suspicious generation patterns
*
* @gdpr Data Retention & Cleanup
* - Automatic pruning of state after 180 days
* - Member data retention tied to article dates
* - Supports right-to-be-forgotten implementations
* - Audit logging for regulatory compliance
*
* @security State Integrity
* - File permissions protect state from unauthorized modification
* - Checksums validate cache data integrity
* - Atomic writes prevent partial state corruption
* - Versioning enables rollback if needed
*
* @author Hack23 AB (Editorial Operations & Workflow Optimization)
* @license Apache-2.0
* @version 2.2.0
* @since 2024-10-15
* @see news/metadata/workflow-state.json (State Persistence)
* @see Issue #150 (Workflow Coordination Enhancement)
* @see docs/WORKFLOW_ARCHITECTURE.md (Complete Architecture)
*/
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import crypto from 'crypto';
import type {
WorkflowState,
MCPCacheEntry,
RecentArticleEntry,
RecentArticleInput,
DuplicateCheckResult,
WorkflowExecutionMetadata,
WorkflowRecord,
WorkflowStatistics,
LockInfo,
ActiveGeneration,
} from './types/workflow.js';
const __filename: string = fileURLToPath(import.meta.url);
const __dirname: string = path.dirname(__filename);
const STATE_FILE: string = path.join(__dirname, '..', 'news', 'metadata', 'workflow-state.json');
const LOCK_DIR: string = path.join(__dirname, '..', 'news', 'metadata', 'locks');
const MCP_CACHE_TTL_SECONDS: number = 2 * 60 * 60; // 2 hours
const MCP_CACHE_TTL_NON_PLENARY_SECONDS: number = 4 * 60 * 60; // 4 hours
const RECENT_ARTICLE_TTL_SECONDS: number = 6 * 60 * 60; // 6 hours
const SIMILARITY_THRESHOLD: number = 0.70; // 70% similarity triggers deduplication
const TOPIC_JACCARD_THRESHOLD: number = 0.50; // 50% topic overlap triggers deduplication
const LOCK_TIMEOUT_MS: number = 45 * 60 * 1000; // 45 minutes
const ACTIVE_GENERATION_TTL_MS: number = 45 * 60 * 1000; // 45 minutes
const RETRIABLE_RENAME_CODES: ReadonlySet<string> = new Set(['EEXIST', 'EPERM', 'EACCES', 'EXDEV']);
/** Returns true when `v` is a non-null, non-array plain object. */
function isPlainObject(v: unknown): v is Record<string, unknown> {
return typeof v === 'object' && v !== null && !Array.isArray(v);
}
const STOCKHOLM_HOUR_FORMATTER: Intl.DateTimeFormat = new Intl.DateTimeFormat('en-GB', {
timeZone: 'Europe/Stockholm',
hour: '2-digit',
hourCycle: 'h23',
});
/**
* Compute Jaccard similarity between two topic arrays.
*
* @param a - First topic array
* @param b - Second topic array
* @returns Jaccard similarity 0.0-1.0
*/
export function jaccardTopicSimilarity(a: string[], b: string[]): number {
const setA: Set<string> = new Set(a.map((t: string) => t.toLowerCase()));
const setB: Set<string> = new Set(b.map((t: string) => t.toLowerCase()));
const intersection: number = [...setA].filter((t: string) => setB.has(t)).length;
const union: number = new Set([...setA, ...setB]).size;
return union === 0 ? 0 : intersection / union;
}
/**
* Return adaptive MCP cache TTL based on Riksdag plenary hours.
*
* Plenary hours: 08:00–16:00 Europe/Stockholm local time (DST-aware) → 2-hour TTL.
* Non-plenary hours → 4-hour TTL (data changes less frequently).
*
* @param now - Optional Date for testing
* @returns TTL in seconds
*/
export function getAdaptiveCacheTTL(now?: Date): number {
const d: Date = now ?? new Date();
const stockholmHour: number = Number.parseInt(STOCKHOLM_HOUR_FORMATTER.format(d), 10);
const isPlenaryHour: boolean = stockholmHour >= 8 && stockholmHour <= 16;
return isPlenaryHour ? MCP_CACHE_TTL_SECONDS : MCP_CACHE_TTL_NON_PLENARY_SECONDS;
}
/**
* Workflow Lock Manager — file-based soft locks for cross-workflow coordination.
*
* Locks are directories under `news/metadata/locks/{type}-{date}.lock/`
* containing an `info.json` file with lease metadata.
*/
export class WorkflowLockManager {
private readonly lockDir: string;
private readonly timeoutMs: number;
constructor(lockDir: string = LOCK_DIR, timeoutMs: number = LOCK_TIMEOUT_MS) {
this.lockDir = lockDir;
this.timeoutMs = timeoutMs;
}
private validateLockInputs(type: string, date: string): void {
if (!/^[a-z0-9-]+$/.test(type)) {
throw new Error(`Invalid lock type "${type}"`);
}
if (!/^\d{4}-\d{2}-\d{2}$/.test(date)) {
throw new Error(`Invalid lock date "${date}"`);
}
}
private getLockPath(type: string, date: string): string {
this.validateLockInputs(type, date);
return path.join(this.lockDir, `${type}-${date}.lock`);
}
/**
* Acquire a soft lock for the given type + date.
* Uses `mkdirSync({ recursive: false })` for atomic creation on POSIX.
*
* @returns true if lock acquired, false if already held
*/
acquireLock(type: string, date: string, workflowId: string): boolean {
const lockPath: string = this.getLockPath(type, date);
const maxReclaims: number = 1; // keep configurable if policy changes
for (let reclaimAttempts: number = 0; reclaimAttempts <= maxReclaims; reclaimAttempts += 1) {
try {
// Ensure parent directory exists
if (!fs.existsSync(this.lockDir)) {
fs.mkdirSync(this.lockDir, { recursive: true });
}
// Atomic directory creation — fails if already exists.
// Note: atomic on local POSIX filesystems; not guaranteed on NFS/distributed FS.
fs.mkdirSync(lockPath, { recursive: false });
const info: LockInfo = {
workflowId,
acquiredAt: new Date().toISOString(),
expiresAfterMs: this.timeoutMs,
};
fs.writeFileSync(path.join(lockPath, 'info.json'), JSON.stringify(info, null, 2), 'utf-8');
return true;
} catch (err: unknown) {
const error: NodeJS.ErrnoException = err as NodeJS.ErrnoException;
if (error?.code !== 'EEXIST') {
const details: string[] = [
'[WorkflowLockManager] Failed to acquire workflow lock',
`lockPath=${lockPath}`,
];
Eif (typeof error?.code === 'string') details.push(`code=${error.code}`);
Eif (typeof error?.message === 'string') details.push(`message=${error.message}`);
console.error(details.join(' | '));
throw err;
}
let reclaimed: boolean = false;
const infoPath: string = path.join(lockPath, 'info.json');
if (fs.existsSync(infoPath)) {
try {
const raw: string = fs.readFileSync(infoPath, 'utf-8');
const existing: LockInfo = JSON.parse(raw) as LockInfo;
const acquiredAtMs: number = new Date(existing.acquiredAt).getTime();
const hasValidAcquiredAt: boolean = Number.isFinite(acquiredAtMs);
const hasExpiresAfterMs: boolean = Object.prototype.hasOwnProperty.call(
existing,
'expiresAfterMs',
);
const hasValidExpiresAfterMs: boolean =
hasExpiresAfterMs &&
typeof existing.expiresAfterMs === 'number' &&
Number.isFinite(existing.expiresAfterMs) &&
existing.expiresAfterMs > 0;
const expiryMs: number = (hasValidExpiresAfterMs && typeof existing.expiresAfterMs === 'number') ? existing.expiresAfterMs : this.timeoutMs;
const isExpired: boolean = hasValidAcquiredAt && Date.now() - acquiredAtMs > expiryMs;
const treatAsCorrupt: boolean = !hasValidAcquiredAt || (hasExpiresAfterMs && !hasValidExpiresAfterMs);
if ((isExpired || treatAsCorrupt) && reclaimAttempts < maxReclaims) {
// Stale or corrupt lock — reclaim so workflows aren't blocked indefinitely.
fs.rmSync(lockPath, { recursive: true, force: true });
reclaimed = true;
}
} catch {
// Corrupt or unreadable info.json — treat as reclaimable when within maxReclaims.
Eif (reclaimAttempts < maxReclaims) {
try {
fs.rmSync(lockPath, { recursive: true, force: true });
reclaimed = true;
} catch {
// If we cannot remove the corrupt lock, treat as held.
}
}
}
} else if (EreclaimAttempts < maxReclaims) {
// Lock directory exists but info.json is missing — orphaned lock.
// Reclaim it so workflows aren't blocked indefinitely.
try {
fs.rmSync(lockPath, { recursive: true, force: true });
reclaimed = true;
} catch {
// If we cannot remove the orphaned lock, treat as held.
}
}
if (!reclaimed) return false;
}
}
return false;
}
/**
* Release a held lock.
*/
releaseLock(type: string, date: string): void {
const lockPath: string = this.getLockPath(type, date);
try {
fs.rmSync(lockPath, { recursive: true, force: true });
} catch {
// Ignore errors during cleanup
}
}
/**
* Check if a lock is currently held.
*/
isLocked(type: string, date: string): boolean {
const lockPath: string = this.getLockPath(type, date);
return fs.existsSync(lockPath);
}
/**
* Read lock information if the lock exists.
*/
getLockInfo(type: string, date: string): LockInfo | null {
const infoPath: string = path.join(this.getLockPath(type, date), 'info.json');
try {
if (fs.existsSync(infoPath)) {
return JSON.parse(fs.readFileSync(infoPath, 'utf-8')) as LockInfo;
}
} catch {
// Ignore read errors
}
return null;
}
/**
* Remove all locks older than the configured timeout.
*
* @returns Number of stale locks cleaned up
*/
cleanupStaleLocks(): number {
let cleaned: number = 0;
try {
if (!fs.existsSync(this.lockDir)) return 0;
const entries: string[] = fs.readdirSync(this.lockDir);
for (const entry of entries) {
Iif (!entry.endsWith('.lock')) continue;
const lockPath: string = path.join(this.lockDir, entry);
const infoPath: string = path.join(lockPath, 'info.json');
try {
if (fs.existsSync(infoPath)) {
const info: LockInfo = JSON.parse(fs.readFileSync(infoPath, 'utf-8')) as LockInfo;
const acquiredAtMs: number = new Date(info.acquiredAt).getTime();
const explicitExpiry: unknown = info.expiresAfterMs;
const hasExplicitExpiry: boolean = explicitExpiry !== undefined;
const expiryMs: number =
hasExplicitExpiry && typeof explicitExpiry === 'number'
? explicitExpiry
: this.timeoutMs;
const isAcquiredAtFinite: boolean = Number.isFinite(acquiredAtMs);
const hasInvalidExplicitExpiry: boolean =
hasExplicitExpiry &&
!(typeof explicitExpiry === 'number' && Number.isFinite(explicitExpiry) && explicitExpiry > 0);
// Treat invalid timestamp/expiry as corrupt and reclaimable.
if (
!isAcquiredAtFinite ||
hasInvalidExplicitExpiry ||
Date.now() - acquiredAtMs > expiryMs
) {
fs.rmSync(lockPath, { recursive: true, force: true });
cleaned++;
}
} else {
// No info.json — remove orphaned lock directory
fs.rmSync(lockPath, { recursive: true, force: true });
cleaned++;
}
} catch {
// Treat unreadable/corrupt info.json or other per-lock errors as invalid lock;
// attempt best-effort removal so stale/corrupt locks do not block new workflows.
try {
fs.rmSync(lockPath, { recursive: true, force: true });
cleaned++;
} catch {
// Ignore errors during lock directory removal
}
}
}
} catch {
// Ignore overall cleanup errors
}
return cleaned;
}
}
/**
* Workflow State Coordinator
*/
export class WorkflowStateCoordinator {
private stateFilePath: string;
private state: WorkflowState;
constructor(stateFilePath: string = STATE_FILE) {
this.stateFilePath = stateFilePath;
this.state = {
lastUpdate: null,
recentArticles: [],
mcpQueryCache: {},
workflows: {},
activeGenerations: [],
};
}
/**
* Load state from disk
*/
async load(): Promise<void> {
try {
if (fs.existsSync(this.stateFilePath)) {
const content: string = fs.readFileSync(this.stateFilePath, 'utf-8');
const parsed: unknown = JSON.parse(content);
const raw: Partial<WorkflowState> =
isPlainObject(parsed) ? (parsed as Partial<WorkflowState>) : {};
// Normalize all required fields so legacy/partial state files
// don't cause runtime errors when accessed later.
this.state = {
lastUpdate: typeof raw.lastUpdate === 'string' ? raw.lastUpdate : null,
recentArticles: Array.isArray(raw.recentArticles) ? raw.recentArticles : [],
mcpQueryCache: isPlainObject(raw.mcpQueryCache)
? (raw.mcpQueryCache as Record<string, MCPCacheEntry>)
: {},
workflows: isPlainObject(raw.workflows)
? (raw.workflows as Record<string, WorkflowRecord>)
: {},
activeGenerations: Array.isArray(raw.activeGenerations) ? raw.activeGenerations : [],
};
this.cleanupExpiredEntries();
} else {
// Initialize empty state
await this.save();
}
} catch (error: unknown) {
const message: string = error instanceof Error ? error.message : String(error);
console.warn('Warning: Could not load workflow state:', message);
// Continue with empty state
}
}
/**
* Save state to disk using atomic write (write-to-tmp + rename).
*/
async save(): Promise<void> {
try {
const dir: string = path.dirname(this.stateFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
this.state.lastUpdate = new Date().toISOString();
const tmpPath: string = `${this.stateFilePath}.tmp.${process.pid}`;
fs.writeFileSync(tmpPath, JSON.stringify(this.state, null, 2), 'utf-8');
try {
fs.renameSync(tmpPath, this.stateFilePath);
} catch (renameErr: unknown) {
// On Windows, renameSync can fail when the destination already exists.
// For known retriable codes, use backup-then-rename so last good state
// remains recoverable if retry fails.
const code: string | undefined = (renameErr as NodeJS.ErrnoException).code;
if (code && RETRIABLE_RENAME_CODES.has(code)) {
const backupPath: string = `${this.stateFilePath}.bak`;
let hadExisting: boolean = false;
try {
if (fs.existsSync(this.stateFilePath)) {
hadExisting = true;
if (fs.existsSync(backupPath)) {
fs.unlinkSync(backupPath);
}
fs.renameSync(this.stateFilePath, backupPath);
}
fs.renameSync(tmpPath, this.stateFilePath);
if (hadExisting) {
try { fs.unlinkSync(backupPath); } catch { /* ignore backup cleanup error */ }
}
return;
} catch (retryErr: unknown) {
// Restore previous state if we moved it to backup but failed to write new state.
try {
if (hadExisting && !fs.existsSync(this.stateFilePath) && fs.existsSync(backupPath)) {
fs.renameSync(backupPath, this.stateFilePath);
}
} catch (restoreErr: unknown) {
const restoreMessage: string = restoreErr instanceof Error ? restoreErr.message : String(restoreErr);
console.warn(`Warning: Failed to restore workflow state backup at ${backupPath}: ${restoreMessage}`);
}
// Best-effort cleanup of tmp file after failed retry
try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup error */ }
throw retryErr;
}
}
// Non-retriable rename failure: clean up tmp file and rethrow
try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup error */ }
throw renameErr;
}
} catch (error: unknown) {
const message: string = error instanceof Error ? error.message : String(error);
console.error('Error saving workflow state:', message);
throw error;
}
}
/**
* Clean up expired cache entries and old articles
*/
cleanupExpiredEntries(): void {
const now: number = Date.now();
// Clean MCP cache using per-entry TTL (default: MCP_CACHE_TTL_SECONDS, 2 hours)
Object.keys(this.state.mcpQueryCache).forEach((key: string) => {
const entry: MCPCacheEntry | undefined = this.state.mcpQueryCache[key];
Iif (!entry) {
delete this.state.mcpQueryCache[key];
return;
}
const entryTime: number = new Date(entry.timestamp).getTime();
// If timestamp is invalid (NaN), treat as expired and delete
Iif (isNaN(entryTime)) {
delete this.state.mcpQueryCache[key];
return;
}
const effectiveTtlSeconds: number =
typeof entry.ttl === 'number' && entry.ttl > 0
? entry.ttl
: MCP_CACHE_TTL_SECONDS;
if (now - entryTime > effectiveTtlSeconds * 1000) {
delete this.state.mcpQueryCache[key];
}
});
// Clean recent articles (6-hour TTL)
this.state.recentArticles = this.state.recentArticles.filter((article: RecentArticleEntry) => {
const articleTime: number = new Date(article.timestamp).getTime();
// If timestamp is invalid (NaN), treat as expired and exclude
Iif (isNaN(articleTime)) {
return false;
}
return (now - articleTime) <= RECENT_ARTICLE_TTL_SECONDS * 1000;
});
// Clean stale active generations
Eif (this.state.activeGenerations) {
this.state.activeGenerations = this.state.activeGenerations.filter((generation: ActiveGeneration) => {
const startedAt: number = new Date(generation.startedAt).getTime();
Iif (isNaN(startedAt)) {
return false;
}
return (now - startedAt) <= ACTIVE_GENERATION_TTL_MS;
});
}
}
/**
* Cache MCP query result with adaptive TTL.
*
* @param queryKey - Unique identifier for the query
* @param result - Query result to cache
* @param ttl - Time to live in seconds (default: adaptive based on plenary hours)
*/
async cacheMCPQuery(queryKey: string, result: unknown, ttl?: number): Promise<void> {
const effectiveTtl: number = ttl ?? getAdaptiveCacheTTL();
const resultHash: string = this.hashObject(result);
this.state.mcpQueryCache[queryKey] = {
timestamp: new Date().toISOString(),
ttl: effectiveTtl,
resultHash,
result,
};
await this.save();
}
/**
* Get cached MCP query result
*
* @param queryKey - Unique identifier for the query
* @returns Cached result or null if expired/missing
*/
getCachedMCPQuery(queryKey: string): unknown | null {
this.cleanupExpiredEntries();
const entry: MCPCacheEntry | undefined = this.state.mcpQueryCache[queryKey];
if (!entry) return null;
const now: number = Date.now();
const entryTime: number = new Date(entry.timestamp).getTime();
// Use per-entry TTL with fallback to default constant
const effectiveTtlSeconds: number =
typeof entry.ttl === 'number' && entry.ttl > 0
? entry.ttl
: MCP_CACHE_TTL_SECONDS;
Iif (now - entryTime > effectiveTtlSeconds * 1000) {
delete this.state.mcpQueryCache[queryKey];
return null;
}
return entry.result;
}
/**
* Add recent article to tracking
*
* @param article - Article metadata
*/
async addRecentArticle(article: RecentArticleInput): Promise<void> {
const articleEntry: RecentArticleEntry = {
slug: article.slug,
timestamp: new Date().toISOString(),
workflow: article.workflow ?? 'unknown',
title: article.title,
topics: article.topics ? [...article.topics] : [],
mcpQueries: article.mcpQueries ? [...article.mcpQueries] : [],
significance: article.significance,
};
this.state.recentArticles.push(articleEntry);
await this.save();
}
/**
* Check if article is duplicate based on similarity.
*
* Uses both weighted title/topic/source similarity (≥ 0.70 threshold)
* and Jaccard topic-only similarity (≥ 0.50 threshold) to catch
* same-topic articles with different titles.
*
* @param title - Article title
* @param topics - Article topics
* @param mcpQueries - MCP query keys used for this article
* @param significance - Optional political significance score (0-100).
* When provided and ≥ 80, a same-topic article with lower significance
* is NOT treated as a duplicate — the high-significance version overrides.
* @returns Duplicate check result with similarity score
*/
async checkDuplicateArticle(
title: string,
topics: string[] = [],
mcpQueries: string[] = [],
significance?: number,
): Promise<DuplicateCheckResult> {
this.cleanupExpiredEntries();
// Track both:
// 1) maxSimilarity/matchedArticle: highest similarity overall (for reporting)
// 2) bestDuplicateScore/duplicateMatchedArticle: highest score among entries
// that actually satisfy duplicate criteria (combined>=0.70 OR topic>=0.50)
let maxSimilarity: number = 0;
let matchedArticle: RecentArticleEntry | null = null;
let bestDuplicateScore: number = -1;
let duplicateMatchedArticle: RecentArticleEntry | null = null;
let isDuplicate: boolean = false;
const similarMatches: RecentArticleEntry[] = [];
for (const recentArticle of this.state.recentArticles) {
// Weighted combined similarity (title 50%, topics 30%, sources 20%)
const combinedSimilarity: number = this.calculateSimilarity(
title,
topics,
mcpQueries,
recentArticle.title,
[...recentArticle.topics],
[...recentArticle.mcpQueries],
);
// Jaccard topic-only similarity for semantic deduplication
const topicJaccard: number = jaccardTopicSimilarity(topics, recentArticle.topics);
const effectiveSimilarity: number = Math.max(combinedSimilarity, topicJaccard);
const duplicateByCombined: boolean = combinedSimilarity >= SIMILARITY_THRESHOLD;
const duplicateByTopic: boolean = topicJaccard >= TOPIC_JACCARD_THRESHOLD;
const currentIsDuplicate: boolean = duplicateByCombined || duplicateByTopic;
if (effectiveSimilarity > maxSimilarity) {
maxSimilarity = effectiveSimilarity;
matchedArticle = recentArticle;
}
if (currentIsDuplicate) {
similarMatches.push(recentArticle);
}
if (currentIsDuplicate && effectiveSimilarity > bestDuplicateScore) {
bestDuplicateScore = effectiveSimilarity;
duplicateMatchedArticle = recentArticle;
isDuplicate = true;
}
}
// High-significance override: if the new article has significance ≥ 80
// and the matched article either has no numeric significance or a score < 80,
// treat the matched article as lower/unknown significance and allow the new
// article to be published alongside the existing one (isDuplicate = false).
if (
isDuplicate &&
typeof significance === 'number' &&
significance >= 80 &&
!similarMatches.some((article: RecentArticleEntry) =>
typeof article.significance === 'number' && article.significance >= 80
)
) {
return {
isDuplicate: false,
matchedArticle: null,
similarityScore: maxSimilarity,
};
}
return {
isDuplicate,
matchedArticle: isDuplicate ? (duplicateMatchedArticle ?? matchedArticle) : null,
similarityScore: isDuplicate ? bestDuplicateScore : maxSimilarity,
};
}
/**
* Calculate similarity between two articles
*
* Uses weighted combination of:
* - Title similarity (50%)
* - Topic overlap (30%)
* - MCP query overlap (20%)
*
* @returns Similarity score 0.0-1.0
*/
calculateSimilarity(
title1: string,
topics1: string[],
mcpQueries1: string[],
title2: string,
topics2: string[],
mcpQueries2: string[],
): number {
const titleSim: number = this.stringSimilarity(title1, title2);
const topicSim: number = this.setOverlap(topics1, topics2);
const sourceSim: number = this.setOverlap(mcpQueries1, mcpQueries2);
return (titleSim * 0.5) + (topicSim * 0.3) + (sourceSim * 0.2);
}
/**
* Calculate string similarity using Jaccard similarity of word sets
*
* @param str1 - First string
* @param str2 - Second string
* @returns Similarity 0.0-1.0
*/
stringSimilarity(str1: string, str2: string): number {
Iif (!str1 || !str2) return 0;
const words1: Set<string> = new Set(str1.toLowerCase().split(/\s+/).filter((w: string) => w.length > 2));
const words2: Set<string> = new Set(str2.toLowerCase().split(/\s+/).filter((w: string) => w.length > 2));
return this.setOverlap([...words1], [...words2]);
}
/**
* Calculate set overlap (Jaccard similarity)
*
* @param set1 - First set
* @param set2 - Second set
* @returns Overlap 0.0-1.0
*/
setOverlap(set1: unknown[], set2: unknown[]): number {
if (!set1 || !set2 || set1.length === 0 || set2.length === 0) return 0;
const s1: Set<string> = new Set(set1.map((x: unknown) => String(x).toLowerCase()));
const s2: Set<string> = new Set(set2.map((x: unknown) => String(x).toLowerCase()));
const intersection: Set<string> = new Set([...s1].filter((x: string) => s2.has(x)));
const union: Set<string> = new Set([...s1, ...s2]);
return intersection.size / union.size;
}
/**
* Hash object for cache comparison
*
* @param obj - Object to hash
* @returns SHA-256 hash (first 16 hex chars)
*/
hashObject(obj: unknown): string {
// Handle null/undefined and non-object inputs safely
// Only use Object.keys for non-null objects, otherwise let JSON.stringify
// use its default behavior
const replacer: string[] | undefined =
obj !== null && typeof obj === 'object' && !Array.isArray(obj)
? Object.keys(obj as Record<string, unknown>).sort()
: undefined;
const str: string = JSON.stringify(obj, replacer);
return crypto.createHash('sha256').update(str).digest('hex').substring(0, 16);
}
/**
* Record workflow execution
*
* @param workflowName - Name of workflow
* @param metadata - Execution metadata
*/
async recordWorkflowExecution(
workflowName: string,
metadata: WorkflowExecutionMetadata = {},
): Promise<void> {
if (!this.state.workflows[workflowName]) {
this.state.workflows[workflowName] = {
lastRun: null,
runCount: 0,
articlesGenerated: 0,
};
}
const record: WorkflowRecord = this.state.workflows[workflowName];
record.lastRun = new Date().toISOString();
record.runCount++;
if (metadata.articlesGenerated) {
record.articlesGenerated += metadata.articlesGenerated;
}
await this.save();
}
/**
* Register an active generation for cross-workflow visibility.
*/
async registerActiveGeneration(workflowId: string, type: string, date: string): Promise<void> {
this.cleanupExpiredEntries();
Iif (!this.state.activeGenerations) {
this.state.activeGenerations = [];
}
const exists: boolean = this.state.activeGenerations.some(
(g: ActiveGeneration) => g.workflowId === workflowId && g.type === type && g.date === date,
);
if (exists) {
return;
}
this.state.activeGenerations.push({
workflowId,
type,
date,
startedAt: new Date().toISOString(),
});
await this.save();
}
/**
* Unregister an active generation when done.
*/
async unregisterActiveGeneration(workflowId: string, type: string, date: string): Promise<void> {
Iif (!this.state.activeGenerations) return;
this.state.activeGenerations = this.state.activeGenerations.filter(
(g: ActiveGeneration) => !(g.workflowId === workflowId && g.type === type && g.date === date),
);
await this.save();
}
/**
* Get active generations for cross-workflow visibility.
*/
getActiveGenerations(): ActiveGeneration[] {
return this.state.activeGenerations ?? [];
}
/**
* Get recent articles from last N hours
*
* @param hours - Hours to look back
* @returns Recent articles
*/
getRecentArticles(hours: number = 6): RecentArticleEntry[] {
this.cleanupExpiredEntries();
const cutoff: Date = new Date(Date.now() - hours * 60 * 60 * 1000);
return this.state.recentArticles.filter((article: RecentArticleEntry) => {
return new Date(article.timestamp) >= cutoff;
});
}
/**
* Get workflow statistics
*
* @returns Statistics by workflow
*/
getWorkflowStatistics(): WorkflowStatistics {
return {
...this.state.workflows,
cacheSize: Object.keys(this.state.mcpQueryCache).length,
recentArticlesCount: this.state.recentArticles.length,
};
}
}
// Export for direct usage
export {
MCP_CACHE_TTL_SECONDS,
MCP_CACHE_TTL_NON_PLENARY_SECONDS,
RECENT_ARTICLE_TTL_SECONDS,
SIMILARITY_THRESHOLD,
TOPIC_JACCARD_THRESHOLD,
LOCK_TIMEOUT_MS,
ACTIVE_GENERATION_TTL_MS,
};
|