Wie verwende ich Reactor Core für das Datenstreaming in Google Cloud Pub/Sub?

Nov 14, 2025Eine Nachricht hinterlassen

Als Reactor Core-Anbieter freue ich mich, Ihnen mitzuteilen, wie Sie Reactor Core für das Datenstreaming in Google Cloud Pub/Sub nutzen können. Google Cloud Pub/Sub ist ein vollständig verwalteter Echtzeit-Messaging-Dienst, der Ihnen das Senden und Empfangen von Nachrichten zwischen unabhängigen Anwendungen ermöglicht. Reactor Core hingegen ist eine reaktive Programmierbibliothek für die JVM, die eine API zum Erstellen asynchroner und ereignisgesteuerter Anwendungen bereitstellt. Die Kombination dieser beiden Technologien kann erhebliche Vorteile für Ihre Datenstreaming-Anwendungen bringen.

Grundlegendes zu Google Cloud Pub/Sub

Google Cloud Pub/Sub basiert auf einem Publish-Subscribe-Modell. Bei diesem Modell senden Herausgeber Nachrichten an Themen und Abonnenten erhalten Nachrichten von Abonnements. Ein Thema ist ein logischer Kanal, über den Nachrichten gesendet werden, und ein Abonnement ist ein Endpunkt, der Nachrichten von einem Thema empfängt.

Zu den Hauptfunktionen von Google Cloud Pub/Sub gehören:

  • Skalierbarkeit: Es kann eine große Anzahl von Nachrichten pro Sekunde verarbeiten und eignet sich daher für Szenarios mit hohem Datenvolumen-Streaming.
  • Zuverlässigkeit: Nachrichten werden dauerhaft gespeichert und Pub/Sub gewährleistet eine mindestens einmalige Zustellung.
  • Flexibilität: Es unterstützt mehrere Programmiersprachen und kann in verschiedene Google Cloud-Dienste integriert werden.

Warum Reactor Core mit Google Cloud Pub/Sub verwenden?

Reactor Core bietet ein reaktives Programmiermodell, das sich gut für die Verarbeitung asynchroner und nicht blockierender Vorgänge eignet. Bei Verwendung mit Google Cloud Pub/Sub kann es folgende Vorteile bieten:

  • Asynchrone Verarbeitung: Mit Reactor Core können Sie Nachrichten von Pub/Sub asynchron verarbeiten, was bedeutet, dass Ihre Anwendung weiterhin andere Aufgaben ausführen kann, während sie auf Nachrichten wartet. Dies kann den Gesamtdurchsatz Ihrer Anwendung verbessern.
  • Gegendruckmanagement: Der Reaktorkern verfügt über eine integrierte Gegendruckunterstützung. Im Kontext von Pub/Sub bedeutet dies: Wenn Ihre Anwendung Nachrichten nicht so schnell verarbeiten kann, wie sie eintreffen, kann sie Pub/Sub signalisieren, die Nachrichtenübermittlungsrate zu verlangsamen.
  • Zusammensetzbare Streams: Sie können in Reactor Core ganz einfach verschiedene reaktive Streams zusammenstellen. Beispielsweise können Sie Nachrichten aus Pub/Sub vor der weiteren Verarbeitung transformieren, filtern oder aggregieren.

Einrichten der Umgebung

Bevor Sie Reactor Core mit Google Cloud Pub/Sub verwenden können, müssen Sie Ihre Entwicklungsumgebung einrichten.

Voraussetzungen

  • Google Cloud-Konto: Sie müssen über ein Google Cloud-Konto verfügen und die Pub/Sub-API aktivieren.
  • Java Development Kit (JDK): Reactor Core ist eine Java-Bibliothek, daher muss JDK 8 oder höher installiert sein.
  • Maven oder Gradle: Sie können entweder Maven oder Gradle verwenden, um Ihre Java-Abhängigkeiten zu verwalten.

Abhängigkeiten hinzufügen

Wenn Sie Maven verwenden, fügen Sie die folgenden Abhängigkeiten zu Ihrem hinzupom.xml:

<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor – core</artifactId> <version>3.4.15</version> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google – cloud – pubsub</artifactId> <version>1.122.0</version> </dependency> </dependencies>

Wenn Sie Gradle verwenden, fügen Sie Folgendes zu Ihrem hinzubuild.gradle:

Abhängigkeiten { Implementierung 'io.projectreactor:reactor - core:3.4.15' Implementierung 'com.google.cloud:google - cloud - pubsub:1.122.0' }

Veröffentlichen von Nachrichten in Google Cloud Pub/Sub mit Reactor Core

Beginnen wir mit einem Beispiel für die Veröffentlichung von Nachrichten in einem Pub/Sub-Thema mithilfe von Reactor Core.

import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import reactor.core.publisher.Flux; import java.io.FileInputStream; import java.io.IOException; java.util.UUID importieren; public class PubSubPublisherExample { public static void main(String[] args) throws IOException { // Anmeldeinformationen einrichten GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("path/to/your/credentials.json")); CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials); // Einen Themennamen erstellen ProjectTopicName topicName = ProjectTopicName.of("your - project - id", "your - topic - name"); // Einen Publisher erstellen Publisher editor = Publisher.newBuilder(topicName) .setCredentialsProvider(credentialsProvider) .build(); // Nachrichtenfluss erstellen Flux<PubsubMessage> messageFlux = Flux.range(1, 10) .map(i -> { String messageId = UUID.randomUUID().toString(); ByteString data = ByteString.copyFromUtf8("Message " + i + " with ID: " + messageId); return PubsubMessage.newBuilder() .setData(data) .putAttributes("messageId", messageId) .build( }); // Nachrichten veröffentlichen messageFlux.flatMap(message -> { return editor.publish(message).toFuture().thenApply(result -> { System.out.println("Nachricht veröffentlicht mit ID: " + result); return result; }); }).subscribe(); // Den Herausgeber herunterfahren editor.shutdown(); } }

In diesem Beispiel richten wir zunächst die Google Cloud-Anmeldeinformationen ein. Dann erstellen wir eineHerausgeberObjekt für das angegebene Thema. Wir verwenden aFlussum einen Nachrichtenstrom zu erzeugen. Jede Nachricht wird dann asynchron mithilfe von veröffentlichtflache KarteOperator.

Silicon Steel Iron Core factoryReactor Core

Abonnieren von Google Cloud Pub/Sub mit Reactor Core

Schauen wir uns nun an, wie Sie ein Pub/Sub-Abonnement abonnieren und Nachrichten mit Reactor Core verarbeiten.

import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.FileInputStream; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; public class PubSubSubscriberExample { public static void main(String[] args) throws IOException { // Anmeldeinformationen einrichten GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("path/to/your/credentials.json")); CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials); // Einen Abonnementnamen erstellen ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("your - project - id", "your - subscription - name"); // Einen benutzerdefinierten Nachrichtenempfänger erstellen AtomicInteger counter = new AtomicInteger(0); Flux<PubsubMessage> messageFlux = Flux.create(sink -> { MessageReceiver Receiver = (Message, Consumer) -> { Sink.next(Message); Consumer.ack(); }; Subscriber subscriber = Subscriber.newBuilder(subscriptionName, Receiver) .setCredentialsProvider(credentialsProvider) .build(); subscriber.startAsync().awaitRunning(); sink.onCancel(subscriber::stopAsync }); // Nachrichten verarbeiten messageFlux.subscribe(message -> { System.out.println("Empfangene Nachricht: " + message.getData().toStringUtf8()); counter.incrementAndGet(); System.out.println("Gesamtzahl empfangener Nachrichten: " + counter.get()); }); } }

In diesem Beispiel erstellen wir eine benutzerdefinierteNachrichtenempfängerdas empfangene Nachrichten an a sendetFluss. DerFlusswird dann abonniert und jede Nachricht wird auf der Konsole gedruckt. Wir verfolgen auch die Gesamtzahl der empfangenen Nachrichten.

Erweiterte Anwendungsfälle

Nachrichtentransformation und -aggregation

Mit Reactor Core können Sie Nachrichten aus Pub/Sub einfach umwandeln und aggregieren. Sie können beispielsweise die Nachrichtendaten von JSON in ein Java-Objekt konvertieren oder Nachrichten über ein bestimmtes Zeitfenster aggregieren.

import com.google.pubsub.v1.PubsubMessage; import reactor.core.publisher.Flux; import java.time.Duration; öffentliche Klasse MessageTransformationExample { public static void main(String[] args) { Flux<PubsubMessage> messageFlux = getMessageFluxFromPubSub(); Flux<String> transformFlux = messageFlux .map(message -> message.getData().toStringUtf8()) .map(text -> text.toUpperCase()); Flux<Integer>aggregatedFlux = transformFlux .bufferTimeout(10, Duration.ofSeconds(5)) .map(list -> list.size()); aggregierteFlux.subscribe(count -> System.out.println("Anzahl der Nachrichten im Fenster: " + count)); } private static Flux<PubsubMessage> getMessageFluxFromPubSub() { // Code zum Abrufen des Nachrichtenflusses von Pub/Sub return Flux.empty(); } }

In diesem Beispiel wandeln wir zunächst jede Nachricht in Großbuchstaben um. Dann aggregieren wir die Nachrichten in einem Puffer von 10 Nachrichten oder einem Zeitfenster von 5 Sekunden und zählen die Anzahl der Nachrichten in jedem Fenster.

Abschluss

Die Verwendung von Reactor Core für das Datenstreaming in Google Cloud Pub/Sub kann Ihren Anwendungen erhebliche Vorteile bringen. Es bietet eine asynchrone und reaktive Möglichkeit, Nachrichten zu verarbeiten, was die Leistung und Skalierbarkeit Ihrer Datenstreaming-Lösungen verbessern kann. Unabhängig davon, ob Sie Pub/Sub veröffentlichen oder abonnieren, bietet Reactor Core eine leistungsstarke und flexible API zur Verwaltung des Datenflusses.

Wenn Sie an der Verwendung interessiert sindReaktorkernfür Ihre Google Cloud Pub/Sub-Projekte oder die Erkundung anderer verwandter Lösungen wie zSiliziumstahl-EisenkernFür die Beschaffung und weitere Gespräche können Sie sich gerne an uns wenden. Wir sind bestrebt, qualitativ hochwertige Produkte und Dienstleistungen bereitzustellen, die Ihren Anforderungen entsprechen.

Referenzen

  • Google Cloud Pub/Sub-Dokumentation
  • Reaktorkerndokumentation