Source code for nameko_chassis.service

from __future__ import annotations

import logging
import os
import socket
import time
import traceback
from dataclasses import asdict, dataclass
from typing import Any, Dict, List

from nameko.containers import ServiceContainer
from nameko.rpc import rpc
from nameko.web.handlers import http
from nameko_prometheus import PrometheusMetrics
from nameko_sentry import SentryReporter
from werkzeug.wrappers import Request, Response

from .dependencies import ContainerProvider, OpenTelemetryConfig, SentryLoggerConfig

START_TIME = time.time()


[docs]@dataclass(frozen=True) class WorkerState: """ Attributes of a single running worker greenthread. """ class_name: str method_name: str args: List[str] kwargs: Dict[str, str] data: Dict[str, str] stacktrace: List[str]
[docs]@dataclass(frozen=True) class ServiceState: """ Introspection result for an entire service, including running workers. """ version: str service_name: str uptime: float entrypoints: List[str] dependencies: List[str] running_workers: int max_workers: int worker_states: List[WorkerState]
[docs] @classmethod def from_container(cls, container: ServiceContainer) -> ServiceState: """ Introspects a service container and its workers to build ServiceState. """ worker_states = [ WorkerState( class_name=container.service_cls.__name__, method_name=ctx.entrypoint.method_name, # as long as we don't need to do anything else than preview # the args/kwargs, let's dump them as strings; asdict() # has trouble with less simple argument types such as Request args=[str(arg) for arg in ctx.args], kwargs={k: str(v) for k, v in ctx.kwargs.items()}, data={k: str(v) for k, v in ctx.context_data.items()}, # format greenthread's stack trace as list of lines stacktrace=traceback.format_list( traceback.extract_stack(thread.gr_frame) ), ) for ctx, thread in container._worker_threads.items() ] return ServiceState( version=os.environ.get("APP_VERSION", "unknown"), service_name=container.service_name, uptime=time.time() - START_TIME, entrypoints=[e.method_name for e in container.entrypoints], dependencies=[d.attr_name for d in container.dependencies], running_workers=len(container._worker_threads), max_workers=container.max_workers, worker_states=worker_states, )
[docs]class Service: """ Base class for nameko services. """ name = "no name" SentryLoggerConfig() otel_config = OpenTelemetryConfig() sentry = SentryReporter() container = ContainerProvider() metrics = PrometheusMetrics()
[docs] @rpc def say_hello(self) -> str: """ RPC method to ping the service to check if it can be reached. """ return f"Hello from {self.name}!"
[docs] @rpc def query_state(self) -> Dict[str, Any]: """ Returns a detailed state of running service. """ return asdict(ServiceState.from_container(self.container))
[docs] @http("GET", "/metrics") def serve_metrics(self, request: Request) -> Response: """ Exposes Prometheus metrics over HTTP. """ return self.metrics.expose_metrics(request)
[docs] @rpc def set_log_level(self, logger_name: str, level: int) -> str: """ Temporarily override log level in a running service. Useful for example for debugging a live service instance, where your default log level is INFO or higher to avoid clutter in logs. This RPC allows you to change log level while the application is running. For example:: >>> n.rpc.my_service.set_log_level("some.module", logging.DEBUG) Now your logs will include debug messages from ``some.module`` even if your static log configuration (dictConfig etc.) silenced them. Caveat #1: Updating log level in this manner will only affect loggers acquired *after* this RPC call. So your code must call ``logging.get_logger()`` as late as possible. This unfortunately means that library code may or may not be affected - depends on how the library acquires its loggers. Caveat #2: If your service runs in multiple replicas behind a load balancer, you must call this RPC method at least as many times as there are replicas to ensure that each replica will have its log level changed. """ logger = logging.getLogger(__name__) logger_to_change = logging.getLogger(logger_name) logger.info( f"Updating level for {logger_name} from {logger_to_change.level} to {level}" ) message = f""" Log level changed on host {socket.gethostname()}. Revert with: n.rpc.{self.name}.set_log_level({logger_name!r}, {logger_to_change.level!r}) """ logger_to_change.setLevel(level) return message