GarlicBreadcast
“‘O ppane c’aglio nun se perde maje.” — Il pane all’aglio non va mai sprecato. Lo stesso vale per i messaggi GarlicBreadcast.
GarlicBreadcast è il sistema di messaggistica publish/subscribe di Pasta Protocol. Ogni messaggio pubblicato si diffonde a tutti i nodi sottoscritti con la stessa inarrestabilità del profumo d’aglio nel forno: attraversa le porte, penetra nei muri, raggiunge ogni angolo del cluster prima che tu te ne accorga.
GarlicBreadcast è pensato per messaggistica ad alta frequenza, notifiche di stato e propagazione di eventi. Non è un sistema di code: i messaggi non sono garantiti persistenti per default. Per operazioni che richiedono durabilità, si usa il Protocollo Ragu.
Concetti Fondamentali
| Concetto | Descrizione |
|---|---|
| Topic | Canale identificato da una stringa gerarchica (cluster.nodi.connessi) |
| Broadcast | Un messaggio pubblicato su un topic |
| Subscriber | Un nodo o handler iscritto a uno o più topic |
| Paniera | Buffer locale di messaggi non ancora processati |
| Scarpetta | La capacità di recuperare messaggi recenti già pubblicati |
Installazione e inizializzazione
import { GarlicBreadcast } from 'pasta-protocol';
const gbc = new GarlicBreadcast({ nodeId: 'napoli-primary-01', cluster: clusterRef, paniera: { maxSize: 1000, ttl: 60_000, // ms }, scarpetta: { enabled: true, lookback: 5000, // recupera ultimi 5s di messaggi al subscribe },});
await gbc.connect();Publish
La funzione publish è il modo principale per inviare un messaggio. È non-bloccante per default: il messaggio viene messo in coda e inviato in modo asincrono.
// Publish baseawait gbc.publish('cluster.nodi.connessi', { nodeId: 'palermo-03', region: 'eu-south-1', timestamp: Date.now(),});
// Publish con opzioniawait gbc.publish( 'metriche.cpu', { nodeId: 'napoli-02', value: 0.73 }, { ttl: 10_000, // il messaggio scade dopo 10s priority: 'alta', // alta | normale | bassa sticky: false, // non memorizzare per la scarpetta });Differenze rispetto a un fire-and-forget:
await gbc.publish('topic', payload); // fire and forget, nessuna confermaconst ack = await gbc.publishAndWait('topic', payload, { timeout: 3000 });// ack.delivered === true se almeno un subscriber ha ricevuto il messaggio// ack.deliveredTo contiene la lista dei nodeId che hanno confermatoSubscribe
// Subscription sempliceconst unsubscribe = gbc.subscribe('cluster.nodi.connessi', (msg) => { console.log(`Nodo connesso: ${msg.payload.nodeId}`);});
// Wildcards nei topicconst unsubAny = gbc.subscribe('cluster.nodi.*', (msg) => { // riceve 'cluster.nodi.connessi', 'cluster.nodi.disconnessi', ecc. console.log(`Evento nodo [${msg.topic}]: ${msg.payload.nodeId}`);});
// Subscription tipizzatainterface MetricaCpu { nodeId: string; value: number;}
const unsubMetriche = gbc.subscribe<MetricaCpu>('metriche.cpu', (msg) => { if (msg.payload.value > 0.9) { console.warn(`PEPERONCINO: CPU alta su ${msg.payload.nodeId}: ${msg.payload.value * 100}%`); }});
// Scarpetta: recupera i messaggi recenti al momento dell'iscrizioneconst unsubConScarpetta = gbc.subscribe('cluster.eventi', handler, { scarpetta: true, // ricevi gli ultimi 5s di messaggi già pubblicati});Unsubscribe
// Ogni subscribe restituisce una funzione unsubscribeconst unsubscribe = gbc.subscribe('topic', handler);
// Quando non serve piùunsubscribe();
// Oppure con il metodo esplicitogbc.unsubscribe('topic', handler);
// Cancella tutte le subscription di questo nodoawait gbc.disconnect();Gestione degli errori
GarlicBreadcast utilizza la scala di gravità standard di Pasta Protocol:
| Livello | Quando | Comportamento |
|---|---|---|
BRUSCHETTA | Messaggio non consegnato a un subscriber lento | Log + retry automatico |
PEPERONCINO | Paniera piena | Scarto messaggi più vecchi + avviso |
VESUVIO | Connessione al cluster persa | Tentativo di riconnessione + buffer locale |
TERREMOTO | Cluster irraggiungibile da > 30s | Shutdown ordinato del nodo |
gbc.on('error', (error) => { switch (error.level) { case 'BRUSCHETTA': // log e ignora break; case 'PEPERONCINO': metrics.increment('gbc.paniera.overflow'); break; case 'VESUVIO': await alerting.notify('GarlicBreadcast disconnesso'); break; case 'TERREMOTO': await cluster.emergencyShutdown(error); break; }});Topic naming
I topic seguono una gerarchia puntata, simile a un percorso di file system. La convenzione raccomandata:
{dominio}.{risorsa}.{evento}
Esempi: cluster.nodi.connessi cluster.nodi.disconnessi metriche.cpu metriche.memoria.heap consenso.pesto.round-completato saga.onboarding.step-completatoI topic che iniziano con $ sono riservati al sistema interno e non possono essere pubblicati direttamente dall’utente:
$cluster.heartbeat$consensus.internal$gbc.diagnosticsDiagnostica
// Statistiche in tempo realeconst stats = await gbc.stats();console.log(`Messaggi pubblicati: ${stats.published}`);console.log(`Messaggi ricevuti: ${stats.received}`);console.log(`Paniera corrente: ${stats.paniereSize}/${stats.paniereMax}`);console.log(`Subscriber attivi: ${stats.activeSubscribers}`);
// Lista dei topic attiviconst topics = await gbc.listTopics();topics.forEach(t => console.log(`${t.name}: ${t.subscriberCount} subscriber`));