HEX
Server: LiteSpeed
System: Linux atali.colombiahosting.com.co 5.14.0-570.12.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue May 13 06:11:55 EDT 2025 x86_64
User: coopserp (1713)
PHP: 8.2.29
Disabled: dl,exec,passthru,proc_open,proc_close,shell_exec,memory_limit,system,popen,curl_multi_exec,show_source,symlink,link,leak,listen,diskfreespace,tmpfile,ignore_user_abord,highlight_file,source,show_source,fpaththru,virtual,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,posix_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setid,posix_times,posix_ttyname,posix_uname,proc_get_status,proc_nice,proc_terminate
Upload Files
File: //usr/local/lib64/python3.9/site-packages/psutil/tests/__pycache__/test_connections.cpython-39.pyc
a

��?hS�@s\dZddlZddlZddlZddlmZddlmZddlmZddlmZddlm	Z	ddl
Z
ddl
mZdd	l
mZdd
l
m
Z
ddl
mZddl
mZdd
l
mZddl
mZddl
mZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%dd lm&Z&dd!lm'Z'e(ed"e)��Z*d#d$�Z+e!j,j-d%d&�Gd'd(�d(e��Z.Gd)d*�d*e.�Z/e!j,j-d%d&�Gd+d,�d,e.��Z0e!j,j-d%d&�Gd-d.�d.e.��Z1Gd/d0�d0e.�Z2e!j,j3ed1d2�Gd3d4�d4e.��Z4Gd5d6�d6e�Z5dS)7zFTests for psutil.net_connections() and Process.net_connections() APIs.�N)�closing)�AF_INET)�AF_INET6)�
SOCK_DGRAM)�SOCK_STREAM)�FREEBSD)�LINUX)�MACOS)�NETBSD)�OPENBSD)�POSIX)�SUNOS)�WINDOWS)�
supports_ipv6)�PY3)�AF_UNIX)�HAS_NET_CONNECTIONS_UNIX)�SKIP_SYSCONS)�PsutilTestCase)�bind_socket)�bind_unix_socket)�check_connection_ntuple)�create_sockets)�filter_proc_net_connections)�pytest)�
reap_children)�retry_on_failure)�skip_on_access_denied)�tcp_socketpair)�unix_socketpair)�
wait_for_file�SOCK_SEQPACKETcCs$t��j|d�}|dvr t|�S|S)N��kind)�all�unix)�psutil�Process�net_connectionsr)r#�cons�r*�I/usr/local/lib64/python3.9/site-packages/psutil/tests/test_connections.py�this_proc_net_connections2sr,�serial)�namec@s&eZdZdd�Zdd�Zd	dd�ZdS)
�ConnectionTestCasecCstdd�gksJ�dS�Nr$r"�r,��selfr*r*r+�setUp;szConnectionTestCase.setUpcCstdd�gksJ�dSr0r1r2r*r*r+�tearDown>szConnectionTestCase.tearDownr$csdztj|d�}Wn tjy0tr*YdS�Yn0�fdd�|D�}|��|��||ks`J�dS)z�Given a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        r"Ncs"g|]}|j�kr|dd��qS)N�����pid��.0�cr7r*r+�
<listcomp>Q�zBConnectionTestCase.compare_procsys_connections.<locals>.<listcomp>)r&r(ZAccessDeniedr	�sort)r3r8Z	proc_consr#Zsys_consr*r7r+�compare_procsys_connectionsBsz.ConnectionTestCase.compare_procsys_connectionsN)r$)�__name__�
__module__�__qualname__r4r5r?r*r*r*r+r/9sr/c@s4eZdZejjedd�dd��Zdd�Zdd�Z	d	S)
�TestBasicOperations�
requires root��reasoncCsDt��*tjdd�D]}t|�qWd�n1s60YdSr0)rr&r(r�r3�connr*r*r+�test_systemXszTestBasicOperations.test_systemcCsBt��(tdd�D]}t|�qWd�n1s40YdSr0)rr,rrGr*r*r+�test_process^sz TestBasicOperations.test_processcCsnt�t��tdd�Wd�n1s*0Yt�t��tjdd�Wd�n1s`0YdS)Nz???r")rZraises�
ValueErrorr,r&r(r2r*r*r+�test_invalid_kindcs(z%TestBasicOperations.test_invalid_kindN)
r@rArBr�mark�skipifrrIrJrLr*r*r*r+rCWs
rCc@s�eZdZdZdd�Zdd�Zdd�Zejj	e
�dd	�d
d��Zdd
�Zejj	e
�dd	�dd��Z
ejj	edd	�dd��Zejj	edd	�dd��ZdS)�TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.cCsttdd�}tdd�|D��}ts$tr0||��St|�dks@J�|djdkrh||��j|��kshJ�|dSdS)Nr$r"cSsg|]}|j|f�qSr*)�fdr9r*r*r+r<pr=z=TestUnconnectedSockets.get_conn_from_sock.<locals>.<listcomp>�rr6)r,�dictr
r�fileno�lenrP)r3�sockr)Zsmapr*r*r+�get_conn_from_sockns
z)TestUnconnectedSockets.get_conn_from_sockcCs�|�|�}t|�|jdkr.|j|��ks.J�|j|jks>J�|j|�tjtj	�ksXJ�|�
�}|sztrzt|t
�rz|��}|jtkr�|dd�}|j|ks�J�|jtkr�tr�tdd�}|jt��|dd�|S)z�Given a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        r6N�r$r")rVrrPrS�family�type�
getsockopt�socket�
SOL_SOCKET�SO_TYPE�getsocknamer�
isinstance�bytes�decoder�laddrrrr,r?�os�getpid)r3rUrHrbr)r*r*r+�check_socket{s 



z#TestUnconnectedSockets.check_socketcCsbd}tttt|d���8}|�|�}|jdks0J�|jtjks@J�Wd�n1sT0YdS�N��	127.0.0.1r��addrr*)	rrrrre�raddr�statusr&�CONN_LISTEN�r3rjrUrHr*r*r+�test_tcp_v4�s

z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedrEcCsbd}tttt|d���8}|�|�}|jdks0J�|jtjks@J�Wd�n1sT0YdS�N)�::1rrir*)	rrrrrerkrlr&rmrnr*r*r+�test_tcp_v6�s

z"TestUnconnectedSockets.test_tcp_v6cCsbd}tttt|d���8}|�|�}|jdks0J�|jtjks@J�Wd�n1sT0YdSrf)	rrrrrerkrlr&�	CONN_NONErnr*r*r+�test_udp_v4�s

z"TestUnconnectedSockets.test_udp_v4cCsbd}tttt|d���8}|�|�}|jdks0J�|jtjks@J�Wd�n1sT0YdSrp)	rrrrrerkrlr&rsrnr*r*r+�test_udp_v6�s

z"TestUnconnectedSockets.test_udp_v6�
POSIX onlycCsd|��}tt|td���8}|�|�}|jdks2J�|jtjksBJ�Wd�n1sV0YdS�N)rY��	�
get_testfnrrrrerkrlr&rs�r3�testfnrUrHr*r*r+�
test_unix_tcp�s

z$TestUnconnectedSockets.test_unix_tcpcCsd|��}tt|td���8}|�|�}|jdks2J�|jtjksBJ�Wd�n1sV0YdSrwryr{r*r*r+�
test_unix_udp�s

z$TestUnconnectedSockets.test_unix_udpN)r@rArB�__doc__rVrerorrMrNrrrrtrurr}r~r*r*r*r+rOjs



rOc@sBeZdZdZejjedd�dd��Zejje	dd�dd��Z
d	S)
�TestConnectedSocketzFTest socket pairs which are actually connected to
    each other.
    zunreliable on SUONSrEcCs�d}tdd�gksJ�tt|d�\}}zVtdd�}t|�dksBJ�|djtjksVJ�|djtjksjJ�W|��|��n|��|��0dS)Nrg�tcp4r"rirWrrQ)r,rrrTrlr&ZCONN_ESTABLISHED�close)r3rj�server�clientr)r*r*r+�test_tcp�s

�zTestConnectedSocket.test_tcprvcCs|��}t|�\}}z�tdd�}|djr<|djr<J|��|djrX|djrXJ|��ts`trndd�|D�}t|�dks~J�ts�ts�t	s�t
r�|djdks�J�|djdks�J�||djp�|djks�J�n|djp�|dj|ks�J�W|��|��n|��|��0dS)	Nr%r"rrQcSsg|]}|jdkr|�qS)z/var/run/log)rkr9r*r*r+r<�r=z1TestConnectedSocket.test_unix.<locals>.<listcomp>rWrx)rzrr,rbrkr
rrTrr
rr�)r3r|r�r�r)r*r*r+�	test_unix�s$

�zTestConnectedSocket.test_unixN)r@rArBrrrMrNr
r�rr�r*r*r*r+r��s

r�c@s.eZdZdd�Zeed�dd��Zdd�ZdS)	�TestFilterscCs�dd�}t���|dtttgtttg�|dttgttg�|dtgttg�|dttgtg�|dtgtg�|dtgtg�|d	ttgtg�|d
tgtg�|dtgtg�tr�|dtgtttg�Wd�n1s�0YdS)
NcSsbt|d�D] }|j|vsJ�|j|vs
J�q
ts^tj|d�D] }|j|vsNJ�|j|vs<J�q<dS)Nr")r,rXrYrr&r()r#�families�typesrHr*r*r+�checksz'TestFilters.test_filters.<locals>.checkr$�inet�inet4�tcpr��tcp6�udp�udp4�udp6r%)rrrrrrr!r)r3r�r*r*r+�test_filterss*	��zTestFilters.test_filters)Zonly_ifcs�t��fdd�}t�d�}t�d�}tj��jt��d��}|jt	t
�d|d�}|jt	t
�d|d�}|jt	t�d|d�}|jt	t�d|d�}��|�}	t
t|d	d
��}
��|�}t
t|d	d
��}t�r���|�}
t
t|d	d
��}��|�}t
t|d	d
��}nd}
d}d}d}t����D]�}|��}t|�dk�s8J�|D]�}|j|	jk�rh|||t
t|
dtjd
�n�|j|jk�r�|||t
t|dtjd�nZ|jt|
dd�k�r�|||tt|dtjd�n,|jt|dd�k�r<|||tt|dtjd��q<�qdS)Ncs�d}t|�|j|ksJ�|j|ks(J�|j|ks6J�|j|ksDJ�|j|ksRJ�|D]2}	|j|	d�}
|	|vr||
gks�J�qV|
gksVJ�qVtr���|j	|g�dS)N)
r$r�r��inet6r�r�r�r�r�r�r")
rrXrYrbrkrlr(rr?r8)�procrHrXrYrbrkrl�kindsZ	all_kindsr#r)r2r*r+�
check_conn$sz+TestFilters.test_combos.<locals>.check_conna4
            import socket, time
            s = socket.socket({family}, socket.SOCK_STREAM)
            s.bind(('{addr}', 0))
            s.listen(5)
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            [time.sleep(0.1) for x in range(100)]
            a
            import socket, time
            s = socket.socket({family}, socket.SOCK_DGRAM)
            s.bind(('{addr}', 0))
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            [time.sleep(0.1) for x in range(100)]
            )�dirrh)rXrjr|rqT)�deleterQr*)r$r�r�r�r�)r$r�r�r�r�r8)r$r�r�r�r�)r$r�r�r�r�)r�textwrap�dedentrc�path�basenamerz�getcwd�format�intrr�pyrun�evalr rr&r'�childrenr(rTr8rrmrrs�getattr)r3r�Ztcp_templateZudp_templateZtestfileZ
tcp4_templateZ
udp4_templateZ
tcp6_templateZ
udp6_templateZ	tcp4_procZ	tcp4_addrZ	udp4_procZ	udp4_addrZ	tcp6_procZ	tcp6_addrZ	udp6_procZ	udp6_addr�pr)rHr*r2r+�test_combos s�




�
�
�
�



����zTestFilters.test_comboscCs�t����tdd�}t|�t�r$dndks.J�|D]$}|jttfvsHJ�|jtks2J�q2tdd�}t|�dksrJ�|djtks�J�|djtks�J�t�r�tdd�}t|�dks�J�|djtks�J�|djtks�J�tdd�}t|�t�r�dndks�J�|D]*}|jttfv�sJ�|jt	k�sJ��qtd	d�}t|�dk�sJJ�|djtk�s^J�|djt	k�srJ�t��r�td
d�}t|�dk�s�J�|djtk�s�J�|djt	k�s�J�tdd�}t|�t��r�dndk�s�J�|D].}|jttfv�sJ�|jtt	fv�s�J��q�t��rntd
d�}t|�dk�s>J�|D]*}|jtk�sVJ�|jtt	fv�sBJ��qBt
�r�t�s�t�s�tdd�}t|�dk�s�J�|D]*}|jt
k�s�J�|jtt	fv�s�J��q�Wd�n1�s�0YdS)Nr�r"rWrQr�rr�r�r�r�r��r�r%�)rr,rTrrXrrrYrrrrr
r)r3r)rHr*r*r+�
test_count�s\









zTestFilters.test_countN)r@rArBr�rr	r�r�r*r*r*r+r�s

r�rDrEc@s&eZdZdZdd�Ze�dd��ZdS)�TestSystemWideConnectionszTests for net_connections().cCs�dd�}t��rddlm}|��D]L\}}|dkr:ts:q$|\}}t�|�}t|�tt|��ksdJ�||||�q$Wd�n1s�0YdS)NcSs<|D]2}|j|vsJ�|jtkr.|j|vs.J�t|�qdS)N)rXrrYr)r)r��types_rHr*r*r+r��s

z0TestSystemWideConnections.test_it.<locals>.checkr)�	conn_tmapr%)	r�psutil._commonr��itemsrr&r(rT�set)r3r�r�r#�groupsr�r�r)r*r*r+�test_it�s
z!TestSystemWideConnections.test_itcs�t��}t|�}Wd�n1s$0Yg�d}g}t|�D]:}|��}|�|�t�d|�}|�|�}��|j�qB|D]}t	|�q��fdd�t
jdd�D�}	�D]B�t�fdd�|	D��|ks�J�t
���}
t|
�d��|ks�J�q�dS)N�
a"                import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write("hello")
                    [time.sleep(0.1) for x in range(100)]
                csg|]}|j�vr|�qSr*r7�r:�x)�pidsr*r+r<szFTestSystemWideConnections.test_multi_sockets_procs.<locals>.<listcomp>r$r"csg|]}|j�kr|�qSr*r7r�r7r*r+r<!r=)
rrT�rangerz�appendr�r�r�r8r r&r(r')r3�socks�expected�times�fnames�_�fname�srcZsprocZsysconsr�r*)r8r�r+�test_multi_sockets_procs�s,&
�



�
z2TestSystemWideConnections.test_multi_sockets_procsN)r@rArBrr�rr�r*r*r*r+r��sr�c@seZdZdd�ZdS)�TestMisccCs�g}g}tt�D]\}|�d�rtt|�}t|�}|��s@J|��t|vsLJ�||vsXJ�|�|�|�|�qtr~tjtj	t
r�tjdS)NZCONN_)r�r&�
startswithr��str�isupperr�r
Z	CONN_IDLEZ
CONN_BOUNDrZCONN_DELETE_TCB)r3Zints�strsr.�numZstr_r*r*r+�test_net_connection_constants's 


z&TestMisc.test_net_connection_constantsN)r@rArBr�r*r*r*r+r�&sr�)6rrcr[r��
contextlibrrrrrr&rrr	r
rrr
rr�rZpsutil._compatrZpsutil.testsrrrrrrrrrrrrrrrr r��objectr!r,rMZxdist_groupr/rCrOr�r�rNr�r�r*r*r*r+�<module>sb]7g?