# Copyright The OpenTelemetry Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # pylint: disable=unused-import from abc import ABC, abstractmethod from threading import Lock from time import time_ns from typing import Iterable, List, Mapping # This kind of import is needed to avoid Sphinx errors. import mysql.opentelemetry.sdk.metrics import mysql.opentelemetry.sdk.metrics._internal.instrument import mysql.opentelemetry.sdk.metrics._internal.sdk_configuration from mysql.opentelemetry.metrics._internal.instrument import CallbackOptions from mysql.opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from mysql.opentelemetry.sdk.metrics._internal.measurement import Measurement from mysql.opentelemetry.sdk.metrics._internal.metric_reader_storage import ( MetricReaderStorage, ) from mysql.opentelemetry.sdk.metrics._internal.point import Metric class MeasurementConsumer(ABC): @abstractmethod def consume_measurement(self, measurement: Measurement) -> None: pass @abstractmethod def register_asynchronous_instrument( self, instrument: ( "mysql.opentelemetry.sdk.metrics._internal.instrument_Asynchronous" ), ): pass @abstractmethod def collect( self, metric_reader: "mysql.opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Iterable[Metric]: pass class SynchronousMeasurementConsumer(MeasurementConsumer): def __init__( self, sdk_config: "mysql.opentelemetry.sdk.metrics._internal.SdkConfiguration", ) -> None: self._lock = Lock() self._sdk_config = sdk_config # should never be mutated self._reader_storages: Mapping[ "mysql.opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage ] = { reader: MetricReaderStorage( sdk_config, reader._instrument_class_temporality, reader._instrument_class_aggregation, ) for reader in sdk_config.metric_readers } self._async_instruments: List[ "mysql.opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] def consume_measurement(self, measurement: Measurement) -> None: for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( self, instrument: ( "mysql.opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ), ) -> None: with self._lock: self._async_instruments.append(instrument) def collect( self, metric_reader: "mysql.opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] # for now, just use the defaults callback_options = CallbackOptions() deadline_ns = time_ns() + timeout_millis * 10**6 default_timeout_millis = 10000 * 10**6 for async_instrument in self._async_instruments: remaining_time = deadline_ns - time_ns() if remaining_time < default_timeout_millis: callback_options = CallbackOptions(timeout_millis=remaining_time) measurements = async_instrument.callback(callback_options) if time_ns() >= deadline_ns: raise MetricsTimeoutError("Timed out while executing callback") for measurement in measurements: metric_reader_storage.consume_measurement(measurement) return self._reader_storages[metric_reader].collect()