
    f_A                     X   d Z ddlZddlZddlZddlZddlZ	 ddlZddl	Z	ddl
Z	ddlZ	ddlZddlmZ ddlmZ ddlmZmZ ddlmZmZ  ej0                  e      Z G d de      Z G d	 d
e      Z G d de      Z G d de      Z G d de      Z  G d de      Z!y# e$ r ddlZY w xY w)z Use pika with the Gevent IOLoop.    N)BaseConnection)check_callback_arg)AbstractIOReferenceAbstractIOServices)AbstractSelectorIOLoopSelectorIOServicesAdapterc                   F     e Zd ZdZ	 	 	 	 	 	 d fd	Ze	 	 dd       Z xZS )GeventConnectionzwImplementation of pika's ``BaseConnection``.

    An async selector-based connection which integrates with Gevent.
    c                     t         j                  j                  rt        d      |xs t	        t        j                               }t        |t              r|}nt        |      }t        t        | 3  ||||||       y)a  Create a new GeventConnection instance and connect to RabbitMQ on
        Gevent's event-loop.

        :param pika.connection.Parameters|None parameters: The connection
            parameters
        :param callable|None on_open_callback: The method to call when the
            connection is open
        :param callable|None on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`:
            on_open_error_callback(Connection, exception)
        :param callable|None on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the
            cause of connection failure
        :param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
            custom_ioloop: Use a custom Gevent ILoop.
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory
        z-GeventConnection is not supported on Windows.)internal_connection_workflowN)pikacompat
ON_WINDOWSRuntimeError_GeventSelectorIOLoopgeventget_hub
isinstancer    _GeventSelectorIOServicesAdaptersuperr
   __init__)	self
parameterson_open_callbackon_open_error_callbackon_close_callbackcustom_ioloopr   nbio	__class__s	           Z/var/www/cs2snipe.com/venv/lib/python3.12/site-packages/pika/adapters/gevent_connection.pyr   zGeventConnection.__init__'   s    < ;;!!NOO& A.v~~/?@ 	 m%78 D3MBD.")E 	/ 	G    c                      |xs t        t        j                               }t        |       fd} j	                  ||||      S )z_Implement
        :py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
        c                 4    | t        d       | d      S )zConnection factory.zIExpected pika.connection.Parameters instance, but got None in params arg.F)r   r   r   )
ValueError)paramsclsr   s    r    connection_factoryz>GeventConnection.create_connection.<locals>.connection_factoryf   s2    ~  "I J J&%)49; ;r!   )connection_configsr'   r   workflowon_done)r   r   r   r   _start_connection_workflow)r&   r(   r*   r   r)   r'   r   s   `     @r    create_connectionz"GeventConnection.create_connectionX   s\     ' A.v~~/?@ 	 0>	; --11 .  	r!   )NNNNNT)NN)__name__
__module____qualname____doc__r   classmethodr,   __classcell__)r   s   @r    r
   r
   !   s?     !"&(,#'#.2/Gb  )-#'	 r!   r
   c                   2    e Zd ZdZd Zed        Zd Zd Zy)_TSafeCallbackQueueziDispatch callbacks from any thread to be executed in the main thread
    efficiently with IO events.
    c                     t        j                         | _        t        j                         \  | _        | _        t        j                         | _	        y)zQ
        :param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
        N)
queueQueue_queueospipe_read_fd	_write_fd	threadingRLock_write_lockr   s    r    r   z_TSafeCallbackQueue.__init__|   s6    
 kkm(*	%t~ %??,r!   c                     | j                   S )z?The file-descriptor to register for READ events in the IO loop.)r;   r@   s    r    fdz_TSafeCallbackQueue.fd   s     }}r!   c                     | j                   j                  |       | j                  5  t        j                  | j
                  d       ddd       y# 1 sw Y   yxY w)zAdd an item to the queue from any thread. The configured handler
        will be invoked with the item in the main thread.

        :param item: Object to add to the queue.
           N)r8   putr?   r9   writer<   r   callbacks     r    add_callback_threadsafez+_TSafeCallbackQueue.add_callback_threadsafe   sD     	! 	.HHT^^W-	. 	. 	.s   !AAc                     	 | j                   j                         }t        j                  | j                  d        |        y# t
        j                  $ r t        j                  d       Y yw xY w)a  Invoke the next callback from the queue.

        MUST run in the main thread. If no callback was added to the queue,
        this will block the IO loop.

        Performs a blocking READ on the pipe so must only be called when the
        pipe is ready for reading.
           zCallback queue was empty.N)	r8   
get_nowaitr9   readr;   r6   EmptyLOGGERwarningrG   s     r    run_next_callbackz%_TSafeCallbackQueue.run_next_callback   sT    	{{--/H GGDMM1%J {{ 	8NN67	8s   A (A/.A/N)	r-   r.   r/   r0   r   propertyrB   rI   rQ    r!   r    r4   r4   w   s*    
-  	.r!   r4   c                   Z    e Zd ZdZdZdZdZddZd Zd Z	d	 Z
d
 Zd Zd Zd Zd Zd Zy)r   zImplementation of `AbstractSelectorIOLoop` using the Gevent event loop.

    Required by implementations of `SelectorIOServicesAdapter`.
    rK      r   Nc                     |xs t        j                          _        i  _        t         j                  j                          _        t                _         fd} j                   j                  j                  | j                         y)z>
        :param gevent._interfaces.ILoop gevent_loop:
        c                 >    ~ ~j                   j                          y)z$Swallow the fd and events arguments.N)_callback_queuerQ   )rB   eventsr   s     r    run_callback_in_main_threadzC_GeventSelectorIOLoop.__init__.<locals>.run_callback_in_main_thread   s      224r!   N)r   r   _hub_io_watchers_by_fdhubWaiter_waiterr4   rX   add_handlerrB   READ)r   
gevent_hubrZ   s   `  r    r   z_GeventSelectorIOLoop.__init__   sn     2&.."2	"$zz((*  34	5 	--002M	$r!   c                 Z    | j                   j                  j                          d| _         y)zRelease the loop's resources.N)r[   loopdestroyr@   s    r    closez_GeventSelectorIOLoop.close   s    		 	r!   c                     t         j                  d       | j                  j                          t         j                  d       | j                  j	                          y)zNRun the I/O loop. It will loop until requested to exit. See `stop()`.
        z"Passing control to Gevent's IOLoopz,Control was passed back from Gevent's IOLoopN)rO   debugr_   getclearr@   s    r    startz_GeventSelectorIOLoop.start   s>     	9:CDr!   c                 :    | j                   j                  d       y)a$  Request exit from the ioloop. The loop is NOT guaranteed to
        stop before this method returns.

        To invoke `stop()` safely from a thread other than this IOLoop's thread,
        call it via `add_callback_threadsafe`; e.g.,

            `ioloop.add_callback(ioloop.stop)`
        N)r_   switchr@   s    r    stopz_GeventSelectorIOLoop.stop   s     	D!r!   c                    t        j                         | j                  k(  r;t        j	                  d       | j                  j
                  j                  |       yt        j	                  d       t        j                  | j                  j
                  j                  |      }| j                  j                  |       y)a  Requests a call to the given function as soon as possible in the
        context of this IOLoop's thread.

        NOTE: This is the only thread-safe method in IOLoop. All other
        manipulations of IOLoop must be performed from the IOLoop's thread.

        For example, a thread may request a call to the `stop` method of an
        ioloop that is running in a different thread via
        `ioloop.add_callback_threadsafe(ioloop.stop)`

        :param callable callback: The callback method
        z Adding callback from main threadz#Adding callback from another threadN)r   r   r[   rO   rh   rd   run_callback	functoolspartialrX   rI   rG   s     r    add_callbackz"_GeventSelectorIOLoop.add_callback   s}     >>tyy(LL;<IINN''1
 LL>? (()D)DhOH  88Br!   c                 r    | j                   j                  j                  |      }|j                  |       |S )a  Add the callback to the IOLoop timer to be called after delay seconds
        from the time of call on best-effort basis. Returns a handle to the
        timeout.

        :param float delay: The number of seconds to wait to call callback
        :param callable callback: The callback method
        :returns: handle to the created timeout that may be passed to
            `remove_timeout()`
        :rtype: object
        )r[   rd   timerrk   )r   delayrH   ru   s       r    
call_laterz _GeventSelectorIOLoop.call_later   s-     		$$U+Hr!   c                 $    |j                          y)zURemove a timeout

        :param timeout_handle: Handle of timeout to remove
        N)rf   )r   timeout_handles     r    remove_timeoutz$_GeventSelectorIOLoop.remove_timeout  s    
 	r!   c                     | j                   j                  j                  ||      }|| j                  |<   |j	                  |||       y)a  Start watching the given file descriptor for events

        :param int fd: The file descriptor
        :param callable handler: When requested event(s) occur,
            `handler(fd, events)` will be called.
        :param int events: The event mask (READ|WRITE)
        N)r[   rd   ior\   rk   )r   rB   handlerrY   
io_watchers        r    r`   z!_GeventSelectorIOLoop.add_handler  s@     YY^^&&r62
&0#"f-r!   c                     | j                   |   }|j                  }|j                          | j                   |= | j                  |||       y)zChange the events being watched for.

        :param int fd: The file descriptor
        :param int events: The new event mask (READ|WRITE)
        N)r\   rH   rf   r`   )r   rB   rY   r~   rH   s        r    update_handlerz$_GeventSelectorIOLoop.update_handler!  sM     ,,R0
 &&##B'Xv.r!   c                 \    | j                   |   }|j                          | j                   |= y)zgStop watching the given file descriptor for events

        :param int fd: The file descriptor
        N)r\   rf   )r   rB   r~   s      r    remove_handlerz$_GeventSelectorIOLoop.remove_handler/  s.    
 ,,R0
##B'r!   )N)r-   r.   r/   r0   ra   WRITEERRORr   rf   rk   rn   rs   rw   rz   r`   r   r   rS   r!   r    r   r      sK     DEE$*
	"C2
./(r!   r   c                        e Zd ZdZ	 	 	 	 ddZy)r   zESelectorIOServicesAdapter implementation using Gevent's DNS resolver.c           
      r    t        | j                  |||||||      }|j                          t        |      S )zOImplement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
        )native_loophostportfamilysocktypeprotoflagsr*   )_GeventAddressResolver_looprk   _GeventIOLoopIOHandle)	r   r   r   r*   r   r   r   r   resolvers	            r    getaddrinfoz,_GeventSelectorIOServicesAdapter.getaddrinfo<  s@     *djj/3/3173;050529; 	$X..r!   N)r   r   r   r   )r-   r.   r/   r0   r   rS   r!   r    r   r   9  s    O /r!   r   c                       e Zd ZdZd Zd Zy)r   zXImplement `AbstractIOReference`.

    Only used to wrap the _GeventAddressResolver.
    c                 &    |j                   | _        y)zY
        :param subject: subject of the reference containing a `cancel()` method
        N)cancel_cancel)r   subjects     r    r   z_GeventIOLoopIOHandle.__init__Y  s     ~~r!   c                 "    | j                         S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool
        )r   r@   s    r    r   z_GeventIOLoopIOHandle.cancel_  s     ||~r!   N)r-   r.   r/   r0   r   r   rS   r!   r    r   r   S  s    
&r!   r   c                   >    e Zd ZdZdZd Zd Zd Zd Zd Z	d Z
d	 Zy
)r   zPerforms getaddrinfo asynchronously Gevent's configured resolver in a
    separate greenlet and invoking the provided callback with the result.

    See: http://www.gevent.org/dns.html
    )	r   _on_done	_greenlet_ga_host_ga_port
_ga_family_ga_socktype	_ga_proto	_ga_flagsc	                     t        |d       || _        || _        d| _        || _        || _        || _        || _        || _        || _	        y)a  Initialize the `_GeventAddressResolver`.

        :param AbstractSelectorIOLoop native_loop:
        :param host: `see socket.getaddrinfo()`
        :param port: `see socket.getaddrinfo()`
        :param family: `see socket.getaddrinfo()`
        :param socktype: `see socket.getaddrinfo()`
        :param proto: `see socket.getaddrinfo()`
        :param flags: `see socket.getaddrinfo()`
        :param on_done: on_done(records|BaseException) callback for reporting
            result from the given I/O loop. The single arg will be either an
            exception object (check for `BaseException`) in case of failure or
            the result returned by `socket.getaddrinfo()`.
        r*   N)
r   r   r   r   r   r   r   r   r   r   )	r   r   r   r   r   r   r   r   r*   s	            r    r   z_GeventAddressResolver.__init__z  sN      	7I. 
 $r!   c                     | j                   %t        j                  | j                        | _         yt        j                  d       y)z-Start an asynchronous getaddrinfo invocation.Nz&_GeventAddressResolver already started)r   r   	spawn_raw_resolverO   rP   r@   s    r    rk   z_GeventAddressResolver.start  s/    >>!#--dmm<DNNNCDr!   c                 f    d}| j                   d}| j                          | j                          |S )zCancel the pending resolver.FT)r   _stop_greenlet_cleanup)r   changeds     r    r   z_GeventAddressResolver.cancel  s0    >>%G!r!   c                 @    | j                          d| _        d| _        y)z,Stop the resolver and release any resources.N)r   r   r   r@   s    r    r   z_GeventAddressResolver._cleanup  s    
r!   c                 j    | j                   't        j                  | j                          d| _         yy)zbStop the greenlet performing getaddrinfo if running.

        Otherwise, this is a no-op.
        N)r   r   killr@   s    r    r   z%_GeventAddressResolver._stop_greenlet  s*    
 >>%KK'!DN &r!   c                    	 t         j                  j                  | j                  | j                  | j
                  | j                  | j                  | j                        }t        j                  | j                  |      }| j                  j!                  |       y# t        $ r"}t        j                  d|       |}Y d}~bd}~ww xY w)zoCall `getaddrinfo()` and return result via user's callback
        function on the configured IO loop.
        zAddress resolution failed: %rN)r   socketr   r   r   r   r   r   r   	ExceptionrO   errorrq   rr   _dispatch_callbackr   rs   )r   resultexcrH   s       r    r   z_GeventAddressResolver._resolve  s    	]]..t}}dmm/3/3/@/@/3~~t~~OF $$T%<%<fE

)  	LL8#>F	s   A B 	C	'CC	c                     	 t         j                  d| j                         | j                  |       | j	                          y# | j	                          w xY w)zInvoke the configured completion callback and any subsequent cleanup.

        :param result: result from getaddrinfo, or the exception if raised.
        z9Invoking async getaddrinfo() completion callback; host=%rN)rO   rh   r   r   r   )r   r   s     r    r   z)_GeventAddressResolver._dispatch_callback  s>    
	LLK MM&!MMODMMOs   1A AN)r-   r.   r/   r0   	__slots__r   rk   r   r   r   r   r   rS   r!   r    r   r   h  s2    

I<E	"*"r!   r   )"r0   rq   loggingr9   r=   weakrefr6   ImportErrorr7   r   
gevent.hubgevent.socketpika.compatr   pika.adapters.base_connectionr   %pika.adapters.utils.io_services_utilsr   "pika.adapters.utils.nbio_interfacer   r   +pika.adapters.utils.selector_ioloop_adapterr   r   	getLoggerr-   rO   r
   objectr4   r   r   r   r   rS   r!   r    <module>r      s    &   	       8 D
 
		8	$S~ Sl2& 2jJ(2 J(Z/'@ /4/ *mV m{
  s   B 	B)(B)