Skip to content

Send streaming LLM telemetry to Langfuse

Overview

This example uses InferenceRuntime with streaming enabled and shows the Langfuse connection inline while sending the full LLM and HTTP lifecycle — including the complete response body — to Langfuse.

For streaming responses the HTTP span stays open while chunks arrive and closes only when the stream is exhausted (HttpStreamCompleted). By enabling captureStreamingChunks: true each SSE chunk is also recorded as a log event under the http.client.request span, which is useful for debugging but should be left off in production.

Key concepts: - explicit LangfuseConfig / LangfuseHttpTransport / LangfuseExporter setup - withStreaming(): requests a server-sent-events stream from the LLM provider - HttpClientTelemetryProjector($hub, captureStreamingChunks: true): records each chunk and closes the HTTP span with the full body on stream completion - HttpStreamCompleted: new event fired when the stream generator is exhausted - Telemetry::flush(): must be called after the stream is consumed

Example

<?php
require 'examples/boot.php';

use Cognesy\Config\Env;
use Cognesy\Events\Dispatchers\EventDispatcher;
use Cognesy\Http\Telemetry\HttpClientTelemetryProjector;
use Cognesy\Messages\Messages;
use Cognesy\Polyglot\Inference\Inference;
use Cognesy\Polyglot\Inference\InferenceRuntime;
use Cognesy\Polyglot\Inference\LLMProvider;
use Cognesy\Polyglot\Telemetry\PolyglotTelemetryProjector;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseConfig;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseExporter;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseHttpTransport;
use Cognesy\Telemetry\Application\Registry\TraceRegistry;
use Cognesy\Telemetry\Application\Telemetry;
use Cognesy\Telemetry\Application\Projector\CompositeTelemetryProjector;
use Cognesy\Telemetry\Application\Projector\RuntimeEventBridge;

$serviceName = 'examples.b03.telemetry-streaming-langfuse';
$baseUrl = (string) Env::get('LANGFUSE_BASE_URL', '');
if ($baseUrl === '') {
    throw new RuntimeException('Set LANGFUSE_BASE_URL in .env to run this example.');
}
$publicKey = (string) Env::get('LANGFUSE_PUBLIC_KEY', '');
if ($publicKey === '') {
    throw new RuntimeException('Set LANGFUSE_PUBLIC_KEY in .env to run this example.');
}
$secretKey = (string) Env::get('LANGFUSE_SECRET_KEY', '');
if ($secretKey === '') {
    throw new RuntimeException('Set LANGFUSE_SECRET_KEY in .env to run this example.');
}

$events = new EventDispatcher($serviceName);
$hub = new Telemetry(
    registry: new TraceRegistry(),
    exporter: new LangfuseExporter(
        transport: new LangfuseHttpTransport(new LangfuseConfig(
            baseUrl: $baseUrl,
            publicKey: $publicKey,
            secretKey: $secretKey,
        )),
    ),
);

(new RuntimeEventBridge(new CompositeTelemetryProjector([
    new PolyglotTelemetryProjector($hub),
    // captureStreamingChunks: true logs every SSE chunk as an event under
    // the http.client.request span — great for debugging, disable in production
    new HttpClientTelemetryProjector($hub, captureStreamingChunks: true),
])))->attachTo($events);

$runtime = InferenceRuntime::fromProvider(
    provider: LLMProvider::using('openai'),
    events: $events,
);

$stream = Inference::fromRuntime($runtime)
    ->with(
        messages: Messages::fromString('Explain in 3 bullet points why distributed tracing matters for streaming AI responses.'),
        options: ['max_tokens' => 200],
    )
    ->withStreaming()
    ->stream();

echo "Response (streaming):\n";
$fullContent = '';
foreach ($stream->deltas() as $delta) {
    echo $delta->contentDelta;
    $fullContent .= $delta->contentDelta;
}
echo "\n\n";

// Flush AFTER the stream is fully consumed: HttpStreamCompleted fires when
// the generator above is exhausted, carrying the full raw HTTP response body.
$hub->flush();

echo "Telemetry: flushed to Langfuse\n";

assert($fullContent !== '');
?>