Skip to content

GarlicBreadcast

“‘O ppane c’aglio nun se perde maje.” — Il pane all’aglio non va mai sprecato. Lo stesso vale per i messaggi GarlicBreadcast.

GarlicBreadcast è il sistema di messaggistica publish/subscribe di Pasta Protocol. Ogni messaggio pubblicato si diffonde a tutti i nodi sottoscritti con la stessa inarrestabilità del profumo d’aglio nel forno: attraversa le porte, penetra nei muri, raggiunge ogni angolo del cluster prima che tu te ne accorga.

GarlicBreadcast è pensato per messaggistica ad alta frequenza, notifiche di stato e propagazione di eventi. Non è un sistema di code: i messaggi non sono garantiti persistenti per default. Per operazioni che richiedono durabilità, si usa il Protocollo Ragu.

Concetti Fondamentali

ConcettoDescrizione
TopicCanale identificato da una stringa gerarchica (cluster.nodi.connessi)
BroadcastUn messaggio pubblicato su un topic
SubscriberUn nodo o handler iscritto a uno o più topic
PanieraBuffer locale di messaggi non ancora processati
ScarpettaLa capacità di recuperare messaggi recenti già pubblicati

Installazione e inizializzazione

import { GarlicBreadcast } from 'pasta-protocol';
const gbc = new GarlicBreadcast({
nodeId: 'napoli-primary-01',
cluster: clusterRef,
paniera: {
maxSize: 1000,
ttl: 60_000, // ms
},
scarpetta: {
enabled: true,
lookback: 5000, // recupera ultimi 5s di messaggi al subscribe
},
});
await gbc.connect();

Publish

La funzione publish è il modo principale per inviare un messaggio. È non-bloccante per default: il messaggio viene messo in coda e inviato in modo asincrono.

// Publish base
await gbc.publish('cluster.nodi.connessi', {
nodeId: 'palermo-03',
region: 'eu-south-1',
timestamp: Date.now(),
});
// Publish con opzioni
await gbc.publish(
'metriche.cpu',
{ nodeId: 'napoli-02', value: 0.73 },
{
ttl: 10_000, // il messaggio scade dopo 10s
priority: 'alta', // alta | normale | bassa
sticky: false, // non memorizzare per la scarpetta
}
);

Differenze rispetto a un fire-and-forget:

await gbc.publish('topic', payload); // fire and forget, nessuna conferma
const ack = await gbc.publishAndWait('topic', payload, { timeout: 3000 });
// ack.delivered === true se almeno un subscriber ha ricevuto il messaggio
// ack.deliveredTo contiene la lista dei nodeId che hanno confermato

Subscribe

// Subscription semplice
const unsubscribe = gbc.subscribe('cluster.nodi.connessi', (msg) => {
console.log(`Nodo connesso: ${msg.payload.nodeId}`);
});
// Wildcards nei topic
const unsubAny = gbc.subscribe('cluster.nodi.*', (msg) => {
// riceve 'cluster.nodi.connessi', 'cluster.nodi.disconnessi', ecc.
console.log(`Evento nodo [${msg.topic}]: ${msg.payload.nodeId}`);
});
// Subscription tipizzata
interface MetricaCpu {
nodeId: string;
value: number;
}
const unsubMetriche = gbc.subscribe<MetricaCpu>('metriche.cpu', (msg) => {
if (msg.payload.value > 0.9) {
console.warn(`PEPERONCINO: CPU alta su ${msg.payload.nodeId}: ${msg.payload.value * 100}%`);
}
});
// Scarpetta: recupera i messaggi recenti al momento dell'iscrizione
const unsubConScarpetta = gbc.subscribe('cluster.eventi', handler, {
scarpetta: true, // ricevi gli ultimi 5s di messaggi già pubblicati
});

Unsubscribe

// Ogni subscribe restituisce una funzione unsubscribe
const unsubscribe = gbc.subscribe('topic', handler);
// Quando non serve più
unsubscribe();
// Oppure con il metodo esplicito
gbc.unsubscribe('topic', handler);
// Cancella tutte le subscription di questo nodo
await gbc.disconnect();

Gestione degli errori

GarlicBreadcast utilizza la scala di gravità standard di Pasta Protocol:

LivelloQuandoComportamento
BRUSCHETTAMessaggio non consegnato a un subscriber lentoLog + retry automatico
PEPERONCINOPaniera pienaScarto messaggi più vecchi + avviso
VESUVIOConnessione al cluster persaTentativo di riconnessione + buffer locale
TERREMOTOCluster irraggiungibile da > 30sShutdown ordinato del nodo
gbc.on('error', (error) => {
switch (error.level) {
case 'BRUSCHETTA':
// log e ignora
break;
case 'PEPERONCINO':
metrics.increment('gbc.paniera.overflow');
break;
case 'VESUVIO':
await alerting.notify('GarlicBreadcast disconnesso');
break;
case 'TERREMOTO':
await cluster.emergencyShutdown(error);
break;
}
});

Topic naming

I topic seguono una gerarchia puntata, simile a un percorso di file system. La convenzione raccomandata:

{dominio}.{risorsa}.{evento}
Esempi:
cluster.nodi.connessi
cluster.nodi.disconnessi
metriche.cpu
metriche.memoria.heap
consenso.pesto.round-completato
saga.onboarding.step-completato

I topic che iniziano con $ sono riservati al sistema interno e non possono essere pubblicati direttamente dall’utente:

$cluster.heartbeat
$consensus.internal
$gbc.diagnostics

Diagnostica

// Statistiche in tempo reale
const stats = await gbc.stats();
console.log(`Messaggi pubblicati: ${stats.published}`);
console.log(`Messaggi ricevuti: ${stats.received}`);
console.log(`Paniera corrente: ${stats.paniereSize}/${stats.paniereMax}`);
console.log(`Subscriber attivi: ${stats.activeSubscribers}`);
// Lista dei topic attivi
const topics = await gbc.listTopics();
topics.forEach(t => console.log(`${t.name}: ${t.subscriberCount} subscriber`));