vendor/predis/predis/src/Connection/StreamConnection.php line 258

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Predis package.
  4.  *
  5.  * (c) Daniele Alessandri <suppakilla@gmail.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Predis\Connection;
  11. use Predis\Command\CommandInterface;
  12. use Predis\Response\Error as ErrorResponse;
  13. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  14. use Predis\Response\Status as StatusResponse;
  15. /**
  16.  * Standard connection to Redis servers implemented on top of PHP's streams.
  17.  * The connection parameters supported by this class are:.
  18.  *
  19.  *  - scheme: it can be either 'redis', 'tcp', 'rediss', 'tls' or 'unix'.
  20.  *  - host: hostname or IP address of the server.
  21.  *  - port: TCP port of the server.
  22.  *  - path: path of a UNIX domain socket when scheme is 'unix'.
  23.  *  - timeout: timeout to perform the connection (default is 5 seconds).
  24.  *  - read_write_timeout: timeout of read / write operations.
  25.  *  - async_connect: performs the connection asynchronously.
  26.  *  - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
  27.  *  - persistent: the connection is left intact after a GC collection.
  28.  *  - ssl: context options array (see http://php.net/manual/en/context.ssl.php)
  29.  *
  30.  * @author Daniele Alessandri <suppakilla@gmail.com>
  31.  */
  32. class StreamConnection extends AbstractConnection
  33. {
  34.     /**
  35.      * Disconnects from the server and destroys the underlying resource when the
  36.      * garbage collector kicks in only if the connection has not been marked as
  37.      * persistent.
  38.      */
  39.     public function __destruct()
  40.     {
  41.         if (isset($this->parameters->persistent) && $this->parameters->persistent) {
  42.             return;
  43.         }
  44.         $this->disconnect();
  45.     }
  46.     /**
  47.      * {@inheritdoc}
  48.      */
  49.     protected function assertParameters(ParametersInterface $parameters)
  50.     {
  51.         switch ($parameters->scheme) {
  52.             case 'tcp':
  53.             case 'redis':
  54.             case 'unix':
  55.                 break;
  56.             case 'tls':
  57.             case 'rediss':
  58.                 $this->assertSslSupport($parameters);
  59.                 break;
  60.             default:
  61.                 throw new \InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
  62.         }
  63.         return $parameters;
  64.     }
  65.     /**
  66.      * Checks needed conditions for SSL-encrypted connections.
  67.      *
  68.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  69.      *
  70.      * @throws \InvalidArgumentException
  71.      */
  72.     protected function assertSslSupport(ParametersInterface $parameters)
  73.     {
  74.         if (
  75.             filter_var($parameters->persistentFILTER_VALIDATE_BOOLEAN) &&
  76.             version_compare(PHP_VERSION'7.0.0beta') < 0
  77.         ) {
  78.             throw new \InvalidArgumentException('Persistent SSL connections require PHP >= 7.0.0.');
  79.         }
  80.     }
  81.     /**
  82.      * {@inheritdoc}
  83.      */
  84.     protected function createResource()
  85.     {
  86.         switch ($this->parameters->scheme) {
  87.             case 'tcp':
  88.             case 'redis':
  89.                 return $this->tcpStreamInitializer($this->parameters);
  90.             case 'unix':
  91.                 return $this->unixStreamInitializer($this->parameters);
  92.             case 'tls':
  93.             case 'rediss':
  94.                 return $this->tlsStreamInitializer($this->parameters);
  95.             default:
  96.                 throw new \InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
  97.         }
  98.     }
  99.     /**
  100.      * Creates a connected stream socket resource.
  101.      *
  102.      * @param ParametersInterface $parameters Connection parameters.
  103.      * @param string              $address    Address for stream_socket_client().
  104.      * @param int                 $flags      Flags for stream_socket_client().
  105.      *
  106.      * @return resource
  107.      */
  108.     protected function createStreamSocket(ParametersInterface $parameters$address$flags)
  109.     {
  110.         $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout 5.0);
  111.         if (!$resource = @stream_socket_client($address$errno$errstr$timeout$flags)) {
  112.             $this->onConnectionError(trim($errstr), $errno);
  113.         }
  114.         if (isset($parameters->read_write_timeout)) {
  115.             $rwtimeout = (float) $parameters->read_write_timeout;
  116.             $rwtimeout $rwtimeout $rwtimeout : -1;
  117.             $timeoutSeconds floor($rwtimeout);
  118.             $timeoutUSeconds = ($rwtimeout $timeoutSeconds) * 1000000;
  119.             stream_set_timeout($resource$timeoutSeconds$timeoutUSeconds);
  120.         }
  121.         if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
  122.             $socket socket_import_stream($resource);
  123.             socket_set_option($socketSOL_TCPTCP_NODELAY, (int) $parameters->tcp_nodelay);
  124.         }
  125.         return $resource;
  126.     }
  127.     /**
  128.      * Initializes a TCP stream resource.
  129.      *
  130.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  131.      *
  132.      * @return resource
  133.      */
  134.     protected function tcpStreamInitializer(ParametersInterface $parameters)
  135.     {
  136.         if (!filter_var($parameters->hostFILTER_VALIDATE_IPFILTER_FLAG_IPV6)) {
  137.             $address "tcp://$parameters->host:$parameters->port";
  138.         } else {
  139.             $address "tcp://[$parameters->host]:$parameters->port";
  140.         }
  141.         $flags STREAM_CLIENT_CONNECT;
  142.         if (isset($parameters->async_connect) && $parameters->async_connect) {
  143.             $flags |= STREAM_CLIENT_ASYNC_CONNECT;
  144.         }
  145.         if (isset($parameters->persistent)) {
  146.             if (false !== $persistent filter_var($parameters->persistentFILTER_VALIDATE_BOOLEANFILTER_NULL_ON_FAILURE)) {
  147.                 $flags |= STREAM_CLIENT_PERSISTENT;
  148.                 if ($persistent === null) {
  149.                     $address "{$address}/{$parameters->persistent}";
  150.                 }
  151.             }
  152.         }
  153.         $resource $this->createStreamSocket($parameters$address$flags);
  154.         return $resource;
  155.     }
  156.     /**
  157.      * Initializes a UNIX stream resource.
  158.      *
  159.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  160.      *
  161.      * @return resource
  162.      */
  163.     protected function unixStreamInitializer(ParametersInterface $parameters)
  164.     {
  165.         if (!isset($parameters->path)) {
  166.             throw new \InvalidArgumentException('Missing UNIX domain socket path.');
  167.         }
  168.         $flags STREAM_CLIENT_CONNECT;
  169.         if (isset($parameters->persistent)) {
  170.             if (false !== $persistent filter_var($parameters->persistentFILTER_VALIDATE_BOOLEANFILTER_NULL_ON_FAILURE)) {
  171.                 $flags |= STREAM_CLIENT_PERSISTENT;
  172.                 if ($persistent === null) {
  173.                     throw new \InvalidArgumentException(
  174.                         'Persistent connection IDs are not supported when using UNIX domain sockets.'
  175.                     );
  176.                 }
  177.             }
  178.         }
  179.         $resource $this->createStreamSocket($parameters"unix://{$parameters->path}"$flags);
  180.         return $resource;
  181.     }
  182.     /**
  183.      * Initializes a SSL-encrypted TCP stream resource.
  184.      *
  185.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  186.      *
  187.      * @return resource
  188.      */
  189.     protected function tlsStreamInitializer(ParametersInterface $parameters)
  190.     {
  191.         $resource $this->tcpStreamInitializer($parameters);
  192.         $metadata stream_get_meta_data($resource);
  193.         // Detect if crypto mode is already enabled for this stream (PHP >= 7.0.0).
  194.         if (isset($metadata['crypto'])) {
  195.             return $resource;
  196.         }
  197.         if (is_array($parameters->ssl)) {
  198.             $options $parameters->ssl;
  199.         } else {
  200.             $options = array();
  201.         }
  202.         if (!isset($options['crypto_type'])) {
  203.             $options['crypto_type'] = STREAM_CRYPTO_METHOD_TLS_CLIENT;
  204.         }
  205.         if (!stream_context_set_option($resource, array('ssl' => $options))) {
  206.             $this->onConnectionError('Error while setting SSL context options');
  207.         }
  208.         if (!stream_socket_enable_crypto($resourcetrue$options['crypto_type'])) {
  209.             $this->onConnectionError('Error while switching to encrypted communication');
  210.         }
  211.         return $resource;
  212.     }
  213.     /**
  214.      * {@inheritdoc}
  215.      */
  216.     public function connect()
  217.     {
  218.         if (parent::connect() && $this->initCommands) {
  219.             foreach ($this->initCommands as $command) {
  220.                 $response $this->executeCommand($command);
  221.                 if ($response instanceof ErrorResponseInterface) {
  222.                     $this->onConnectionError("`{$command->getId()}` failed: $response"0);
  223.                 }
  224.             }
  225.         }
  226.     }
  227.     /**
  228.      * {@inheritdoc}
  229.      */
  230.     public function disconnect()
  231.     {
  232.         if ($this->isConnected()) {
  233.             fclose($this->getResource());
  234.             parent::disconnect();
  235.         }
  236.     }
  237.     /**
  238.      * Performs a write operation over the stream of the buffer containing a
  239.      * command serialized with the Redis wire protocol.
  240.      *
  241.      * @param string $buffer Representation of a command in the Redis wire protocol.
  242.      */
  243.     protected function write($buffer)
  244.     {
  245.         $socket $this->getResource();
  246.         while (($length strlen($buffer)) > 0) {
  247.             $written = @fwrite($socket$buffer);
  248.             if ($length === $written) {
  249.                 return;
  250.             }
  251.             if ($written === false || $written === 0) {
  252.                 $this->onConnectionError('Error while writing bytes to the server.');
  253.             }
  254.             $buffer substr($buffer$written);
  255.         }
  256.     }
  257.     /**
  258.      * {@inheritdoc}
  259.      */
  260.     public function read()
  261.     {
  262.         $socket $this->getResource();
  263.         $chunk fgets($socket);
  264.         if ($chunk === false || $chunk === '') {
  265.             $this->onConnectionError('Error while reading line from the server.');
  266.         }
  267.         $prefix $chunk[0];
  268.         $payload substr($chunk1, -2);
  269.         switch ($prefix) {
  270.             case '+':
  271.                 return StatusResponse::get($payload);
  272.             case '$':
  273.                 $size = (int) $payload;
  274.                 if ($size === -1) {
  275.                     return;
  276.                 }
  277.                 $bulkData '';
  278.                 $bytesLeft = ($size += 2);
  279.                 do {
  280.                     $chunk fread($socketmin($bytesLeft4096));
  281.                     if ($chunk === false || $chunk === '') {
  282.                         $this->onConnectionError('Error while reading bytes from the server.');
  283.                     }
  284.                     $bulkData .= $chunk;
  285.                     $bytesLeft $size strlen($bulkData);
  286.                 } while ($bytesLeft 0);
  287.                 return substr($bulkData0, -2);
  288.             case '*':
  289.                 $count = (int) $payload;
  290.                 if ($count === -1) {
  291.                     return;
  292.                 }
  293.                 $multibulk = array();
  294.                 for ($i 0$i $count; ++$i) {
  295.                     $multibulk[$i] = $this->read();
  296.                 }
  297.                 return $multibulk;
  298.             case ':':
  299.                 $integer = (int) $payload;
  300.                 return $integer == $payload $integer $payload;
  301.             case '-':
  302.                 return new ErrorResponse($payload);
  303.             default:
  304.                 $this->onProtocolError("Unknown response prefix: '$prefix'.");
  305.                 return;
  306.         }
  307.     }
  308.     /**
  309.      * {@inheritdoc}
  310.      */
  311.     public function writeRequest(CommandInterface $command)
  312.     {
  313.         $commandID $command->getId();
  314.         $arguments $command->getArguments();
  315.         $cmdlen strlen($commandID);
  316.         $reqlen count($arguments) + 1;
  317.         $buffer "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
  318.         foreach ($arguments as $argument) {
  319.             $arglen strlen(strval($argument));
  320.             $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  321.         }
  322.         $this->write($buffer);
  323.     }
  324. }