Kevin Grote

Event-Driven AI Response Streaming with Axon Framework - A Practical Guide

16 min read

Event-Driven AI Response Streaming with Axon Framework: A Practical Guide

Let’s dive into a practical implementation of AI response streaming using event-driven architecture with Axon Framework and Spring. This approach offers real-time updates while maintaining clean architectural patterns.

Core Components

Commands and Events

data class TestCommand(@TargetAggregateIdentifier val id: UUID, val message: String)
data class TestEvent(val id: UUID, val message: String)
data class TestAiResponseCommand(@TargetAggregateIdentifier val id: UUID, val message: String)
data class TestAiResponseEvent(val id: UUID, val message: String)

These simple data classes form our event-driven backbone. Each serves a specific purpose in our CQRS pattern.

The Shared Flux Component

class SharedFlux {
    final val sink: Sinks.Many<Map<UUID, String>> = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux()

This component acts as our broadcast mechanism, allowing multiple subscribers to receive real-time updates from our AI stream.

The Implementation

1. Processing AI Responses

class Processor(
    private val openAiChatModel: OpenAiChatModel,
    private val sharedFlux: SharedFlux,
    private val commandGateway: CommandGateway
) {
    fun on(event: TestEvent) {
        val prompt = Prompt(UserMessage(event.message))
        val finalResponse = StringBuilder()
            .doOnNext { response ->
                val content = response.result.output.content ?: ""
                sharedFlux.sink.emitNext(mapOf( to content), Sinks.EmitFailureHandler.FAIL_FAST)
            .doOnComplete {
                    TestAiResponseCommand(, finalResponse.toString())

2. API Endpoints

class SharedFluxController(
    val queryGateway: QueryGateway,
    private val sharedFlux: SharedFlux,
    val commandGateway: CommandGateway,
) {
    fun generateStream(message: String): Flux<Map<UUID, String>> {
        val id = UUID.randomUUID()
        commandGateway.send<TestCommand>(TestCommand(id, message))
        return sharedFlux.flux

    fun getLiveResponse(): Flux<String> {
        return sharedFlux.flux
            .filter { it.containsKey(id) }
            .map { it[id]!! }

Key Benefits

  1. Separation of Concerns

    • Commands handle user requests
    • Events process AI responses
    • Queries retrieve final results
    • Shared Flux manages real-time updates
  2. Real-Time Streaming

    • Immediate updates as AI generates responses
    • No polling required
    • Efficient multicast to all subscribers
  3. Event Sourcing

    • Complete history of AI responses
    • Ability to replay events
    • Query final responses at any time
  4. Scalability

    • Decoupled components
    • Event-driven architecture
    • Ready for distributed systems

Practical Use Cases

This architecture is particularly useful for:

  • Chat applications requiring real-time responses
  • AI-powered content generation tools
  • Interactive AI assistants
  • Any system requiring real-time AI interaction with multiple clients

Implementation Notes

  • Uses Spring AI’s OpenAiChatModel for AI integration
  • Leverages Axon Framework for event sourcing and CQRS
  • Implements both streaming and query-based endpoints
  • Maintains state through event sourcing

The beauty of this approach lies in its simplicity and separation of concerns, while still providing robust functionality for real-time AI response streaming.

Complete Implementation

package com.versilite.demo.liveSharedAiResponse

import org.axonframework.commandhandling.CommandHandler
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventsourcing.EventSourcingHandler
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.modelling.command.*
import org.axonframework.queryhandling.QueryGateway
import org.axonframework.queryhandling.QueryHandler
import org.axonframework.queryhandling.QueryUpdateEmitter
import org.axonframework.spring.stereotype.Aggregate
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.util.*

class TestController(
    private val openAiChatModel: OpenAiChatModel
) {

    fun generateStream(
            value = "message",
            defaultValue = "Tell me a joke"
        ) message: String?
    ): Flux<String> {
        val client = OpenAiChatOptions.builder()
        val prompt = Prompt(UserMessage(message), client)
        val fullResponse = StringBuilder()
            .doOnNext { response ->
                fullResponse.append(response.result.output.content?: "")
            }.doOnComplete {
               println("Full response: $fullResponse")
           }.map {
                it.result.output.content?: ""

data class TestCommand(@TargetAggregateIdentifier val id: UUID, val message: String)
data class TestEvent(val id: UUID, val message: String)

data class TestAiResponseCommand(@TargetAggregateIdentifier val id: UUID, val message: String)
data class TestAiResponseEvent(val id: UUID, val message: String)

class TestAggregate {
    private lateinit var id: UUID

    constructor() // Required no-args constructor for Axon
    fun handle(command: TestCommand) {
        println("Handling command")
        AggregateLifecycle.apply(TestEvent(, command.message))

    fun handle(command: TestAiResponseCommand) {
        println("Handling ai response command")
        AggregateLifecycle.apply(TestAiResponseEvent(, command.message))

    fun on(event: TestEvent) {
        id =

class Processor(
    private val openAiChatModel: OpenAiChatModel,
    private val sharedFlux: SharedFlux,
    private val commandGateway: CommandGateway
) {
    fun on(event: TestEvent) {
        val prompt = Prompt(UserMessage(event.message), OpenAiChatOptions.builder().withModel(OpenAiApi.ChatModel.GPT_4_O_MINI).build())
        val finalResponse = StringBuilder()
            .doOnNext { response ->
                val content = response.result.output.content ?: ""
                val map = mapOf( to content)
                sharedFlux.sink.emitNext(map, Sinks.EmitFailureHandler.FAIL_FAST)  // Emit each response chunk to the shared sink
            .doOnComplete {
                sharedFlux.sink.tryEmitComplete()  // Complete the stream after processing is done
                commandGateway.send<TestAiResponseCommand>(TestAiResponseCommand(, finalResponse.toString()))


data class TestQuery(val id: UUID)
data class TestReadModel(val message: String)

class SharedFlux {
    final val sink: Sinks.Many<Map<UUID, String>> = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux()  // Shared Flux that emits values to subscribers

class SharedFluxController(
    val queryGateway: QueryGateway,
    private val sharedFlux: SharedFlux,
    val commandGateway: CommandGateway,
    ) {
    private val id: UUID = UUID.randomUUID()

    @GetMapping("/generateStream", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun generateStream(
            value = "message",
            defaultValue = "Tell me a joke"
        ) message: String
    ): Flux<Map<UUID, String>> {
        println("Our id in generateStream is $id")
       commandGateway.send<TestCommand>(TestCommand(id, message))
        return sharedFlux.flux  // Return the shared flux for streaming to the client
    @GetMapping("/finalResponse", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun getFinalResponse(
    ): Flux<TestReadModel> {
        println("Our id in finalResponse is $id")
        val query=  queryGateway.subscriptionQuery(TestQuery(id),,
        return query.initialResult()


    @GetMapping("/liveResponse", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun getLiveResponse(
    ): Flux<String> {
        println("Our id in liveResponse is $id")
        return sharedFlux.flux.filter { it.containsKey(id) }.map { it[id]!! }

class QueryHandler(
    private val eventStore: EventStore,
    private val queryUpdateEmitter: QueryUpdateEmitter
) {
    fun handle(query: TestQuery): TestReadModel {
        val events = eventStore
            .filter { it.payload is TestAiResponseEvent }
            .map {
                it.payload as TestAiResponseEvent
        if (events.isEmpty()) {
            return TestReadModel("No response yet")
        return TestReadModel(events.last().message)

    fun on(event: TestAiResponseEvent) {
            { query -> == },

Kevin Grote

I’m Kevin, a software engineer with a home in Cyprus. I like to travel, to cook and to build companies, currently building a software agency. I think I will write about everything which comes in my mind. That can be mental health, entrepreneurial, technical or any other topic. I hope you enjoy my blog.