Source code for nameko_chassis.dependencies

import logging
import os
import sys

import sentry_sdk
from nameko.containers import WorkerContext
from nameko.extensions import DependencyProvider
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.propagate import set_global_textmap
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from pyrabbit.api import Client
from sentry_sdk.integrations.opentelemetry import (  # type: ignore
    SentryPropagator,
    SentrySpanProcessor,
)
from sentry_sdk.utils import BadDsn

from .discovery import ServiceDiscovery

logger = logging.getLogger(__name__)


[docs] class ContainerProvider(DependencyProvider): """ Allows access to ``ServiceContainer`` running current worker. """
[docs] def get_dependency(self, worker_ctx): """ Returns a ``ServiceContainer`` instance which runs current worker. """ return self.container
[docs] class ServiceDiscoveryProvider(DependencyProvider): def __init__(self, management_host: str, username: str, password: str): self.management_host = management_host self.username = username self.password = password
[docs] def get_dependency(self, worker_ctx: WorkerContext) -> ServiceDiscovery: client = Client(self.management_host, self.username, self.password) return ServiceDiscovery(client)
[docs] def setup_sentry_sdk(**kwargs) -> None: """ Initialize Sentry integration. Call it once per service container. All keywords arguments are forwarded to ``sentry_sdk.init()`` (with some defaults). """ defaults = { "release": os.environ.get("APP_VERSION", None), "environment": os.environ.get("SENTRY_ENVIRONMENT", None), "dsn": os.environ.get("SENTRY_DSN", None), "traces_sample_rate": 1.0, "enable_tracing": True, "before_send": before_send_filter, "before_send_transaction": filter_transactions, "instrumenter": "otel", } params = defaults | kwargs if params["dsn"] is None: logger.info("Skipped sentry init; no DSN configured") else: try: sentry_sdk.init(**params) logger.info("Sentry SDK ready") except BadDsn as err: logger.exception( "Error initializing Sentry integration", extra=dict(error=str(err)) ) sys.exit(1)
def before_send_filter(event, hint): # also ignore retryable error log messages if "log_record" in hint: if "Retryable error:" in hint["log_record"].message: return None return event def filter_transactions(event, hint): if event["transaction"] == "/metrics": return None return event def add_sentry_service(service_name: str) -> None: resource = Resource(attributes={SERVICE_NAME: service_name}) provider = TracerProvider(resource=resource) provider.add_span_processor(SentrySpanProcessor()) processor = BatchSpanProcessor(OTLPSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) set_global_textmap(SentryPropagator())