Send streaming LLM telemetry to Langfuse
Overview¶
This example uses InferenceRuntime with streaming enabled and shows the
Langfuse connection inline while sending the full LLM and HTTP lifecycle —
including the complete response body — to Langfuse.
For streaming responses the HTTP span stays open while chunks arrive and closes
only when the stream is exhausted (HttpStreamCompleted). By enabling
captureStreamingChunks: true each SSE chunk is also recorded as a log event
under the http.client.request span, which is useful for debugging but should
be left off in production.
Key concepts:
- explicit LangfuseConfig / LangfuseHttpTransport / LangfuseExporter setup
- withStreaming(): requests a server-sent-events stream from the LLM provider
- HttpClientTelemetryProjector($hub, captureStreamingChunks: true): records
each chunk and closes the HTTP span with the full body on stream completion
- HttpStreamCompleted: new event fired when the stream generator is exhausted
- Telemetry::flush(): must be called after the stream is consumed
Example¶
<?php
require 'examples/boot.php';
use Cognesy\Config\Env;
use Cognesy\Events\Dispatchers\EventDispatcher;
use Cognesy\Http\Telemetry\HttpClientTelemetryProjector;
use Cognesy\Messages\Messages;
use Cognesy\Polyglot\Inference\Inference;
use Cognesy\Polyglot\Inference\InferenceRuntime;
use Cognesy\Polyglot\Inference\LLMProvider;
use Cognesy\Polyglot\Telemetry\PolyglotTelemetryProjector;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseConfig;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseExporter;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseHttpTransport;
use Cognesy\Telemetry\Application\Registry\TraceRegistry;
use Cognesy\Telemetry\Application\Telemetry;
use Cognesy\Telemetry\Application\Projector\CompositeTelemetryProjector;
use Cognesy\Telemetry\Application\Projector\RuntimeEventBridge;
$serviceName = 'examples.b03.telemetry-streaming-langfuse';
$baseUrl = (string) Env::get('LANGFUSE_BASE_URL', '');
if ($baseUrl === '') {
throw new RuntimeException('Set LANGFUSE_BASE_URL in .env to run this example.');
}
$publicKey = (string) Env::get('LANGFUSE_PUBLIC_KEY', '');
if ($publicKey === '') {
throw new RuntimeException('Set LANGFUSE_PUBLIC_KEY in .env to run this example.');
}
$secretKey = (string) Env::get('LANGFUSE_SECRET_KEY', '');
if ($secretKey === '') {
throw new RuntimeException('Set LANGFUSE_SECRET_KEY in .env to run this example.');
}
$events = new EventDispatcher($serviceName);
$hub = new Telemetry(
registry: new TraceRegistry(),
exporter: new LangfuseExporter(
transport: new LangfuseHttpTransport(new LangfuseConfig(
baseUrl: $baseUrl,
publicKey: $publicKey,
secretKey: $secretKey,
)),
),
);
(new RuntimeEventBridge(new CompositeTelemetryProjector([
new PolyglotTelemetryProjector($hub),
// captureStreamingChunks: true logs every SSE chunk as an event under
// the http.client.request span — great for debugging, disable in production
new HttpClientTelemetryProjector($hub, captureStreamingChunks: true),
])))->attachTo($events);
$runtime = InferenceRuntime::fromProvider(
provider: LLMProvider::using('openai'),
events: $events,
);
$stream = Inference::fromRuntime($runtime)
->with(
messages: Messages::fromString('Explain in 3 bullet points why distributed tracing matters for streaming AI responses.'),
options: ['max_tokens' => 200],
)
->withStreaming()
->stream();
echo "Response (streaming):\n";
$fullContent = '';
foreach ($stream->deltas() as $delta) {
echo $delta->contentDelta;
$fullContent .= $delta->contentDelta;
}
echo "\n\n";
// Flush AFTER the stream is fully consumed: HttpStreamCompleted fires when
// the generator above is exhausted, carrying the full raw HTTP response body.
$hub->flush();
echo "Telemetry: flushed to Langfuse\n";
assert($fullContent !== '');
?>