在terminal下面运行
./yii test/a-m-q-p
yii TestController
params['amqp']); exit(); echo "console test/amqp!\n"; $res = Pusher::pushSMSToMQ($mobile='13692177080' // , $sms_msg='【运呗】验证码1234,您正在注册成为ooo用户,感谢您的支持!' // , $msg_platform='clysms' , $sms_msg='【运呗】aseqwrqwerqwe成立业短信平台 2.2.个性短信接口测试,感谢您的支持! 测试' , $msg_platform='clyafd_sale' , $related_uid=1 , $related_row_id=time() , $msg_datetime='' , $related_table='' ); var_dump($res); var_dump(Pusher::getError()); return 0; }}
Pusher.php
*/namespace app\lib\AMQP;class Pusher { static public $msg_check_key_prefix = 'push_to_mq:'; static private $error = ''; /** * 获取最近一次错误 */ static public function getError() { return self::$error; } /** * 短信 发送MQ静态方法 * * @param string $moible 接受短信的手机号码 * @param string $sms_msg 短信内容,内容格式为,字符串,或者json {"temp_name":"temp_name","params":{"key1":"val1","key2":"val2"}} * @param string $msg_platform 消息平台,指定用哪个平台发送消息 * @param int $related_uid 关联业务UID * @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 * @param datetime $msg_datetime 定时时间,为空时表示立即发送(选填) 格式:yyyy-MM-dd HH:mm:ss * @param string $related_table 关联数据库表名称 * * @access public * @author leeyi* @return string */ static public function pushSMSToMQ($mobile='' , $sms_msg='' , $msg_platform='' , $related_uid=0 , $related_row_id=0 , $msg_datetime='' , $related_table='' ) { self::$error = ''; if( !self::isMobile($mobile) || empty($sms_msg) || empty($msg_platform) || $related_uid<0 || $related_row_id<0 ) { self::$error = '参数不合法'; return false; } $msg = []; $msg['msg_platform'] = $msg_platform; $msg['msg_datetime'] = $msg_datetime; $msg['mobile'] = $mobile; $msg['related_uid'] = intval($related_uid); $msg['related_row_id'] = intval($related_row_id); $msg['related_table'] = strval($related_table); $msg_md5 = md5(json_encode($msg)); // sms_msg 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 $msg['sms_msg'] = $sms_msg; $message = ['msg_md5'=>$msg_md5, 'msg'=>$msg]; $res = self::checkMsg($msg_md5); if( $res ) { self::$error = '已经发送过了,请不要重复请求'; return false; } $mq = new Rabbit(); $res = $mq->sending($queue_name='push.sms', $message, $check_rule_time=time() ); if( $res ) { $cache_key = self::$msg_check_key_prefix.$msg_md5; //set方法的第一个参数是我们的数据对应的key值,方便我们获取到 //第二个参数即是我们要缓存的数据 //第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 \Yii::$app->cache->set($cache_key, 1, 86400); } return $res; } /** * Email 发送MQ静态方法 * * @param string $to_email 收件人Email地址 * @param string $name 收件人称呼 * @param string $msg_title 消息标题 * @param string $msg_body 消息内容 * @param int $related_uid 关联业务UID * @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 * * @access public * @author leeyi * @return string */ static public function pushEmailToMQ($to_email , $name , $msg_title , $msg_body , $related_uid=0 , $related_row_id=0 , $related_table='' ) { self::$error = ''; if( !self::isEmail($to_email) || empty($msg_title) || empty($msg_body) || $related_uid<0 || $related_row_id<0 ) { self::$error = '参数不合法'; return false; } $msg = []; $msg['to_email'] = $to_email; $msg['name'] = $name; $msg['msg_title'] = $msg_title; $msg['related_uid'] = intval($related_uid); $msg['related_row_id'] = intval($related_row_id); $msg['related_table'] = strval($related_table); $msg['msg_platform'] = 'email'; $msg_md5 = md5(json_encode($msg)); // msg_body 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 $msg['msg_body'] = $msg_body; $message = ['msg_md5'=>$msg_md5, 'msg'=>$msg]; $res = self::checkMsg($msg_md5); if( $res ) { self::$error = '已经发送过了,请不要重复请求'; return false; } $mq = new Rabbit(); $res = $mq->sending($queue_name='push.email', $message, $check_rule_time=time() ); if( $res ) { $cache_key = self::$msg_check_key_prefix.$msg_md5; //set方法的第一个参数是我们的数据对应的key值,方便我们获取到 //第二个参数即是我们要缓存的数据 //第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 \Yii::$app->cache->set($cache_key, 1, 86400); } return $res; } /** * Jpush 发送MQ静态方法 * * @param string $title 推送内容标题 * @param string $content 推送内容 * @param array $related_uids 关联业务UID 没有需要设置为 array(0) * @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性 * * @access public * @author leeyi * @return string */ static public function jpushToMQ($title , $content , $related_row_id=0 , $related_uids=[] , $platform='afd' ) { self::$error = ''; if( empty($content) || !is_array($related_uids) || $related_row_id<0 ) { self::$error = '参数不合法'; return false; } $msg = []; $msg['title'] = $title; $msg['related_uids'] = $related_uids; $msg['related_row_id'] = intval($related_row_id); $msg['platform'] = $platform; $msg_md5 = md5(json_encode($msg)); // content 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化 $msg['content'] = $content; $msg['msg_platform'] = 'jpush'; $message = array('msg_md5' => $msg_md5, 'msg' => $msg); $res = self::checkMsg($msg_md5); if( $res ) { self::$error = '已经发送过了,请不要重复请求'; return false; } $mq = new Rabbit(); $res = $mq->sending($queue_name='push.jpush', $message, $check_rule_time=time() ); if( $res ) { $cache_key = self::$msg_check_key_prefix.$msg_md5; //set方法的第一个参数是我们的数据对应的key值,方便我们获取到 //第二个参数即是我们要缓存的数据 //第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0 \Yii::$app->cache->set($cache_key, 1, 86400); } return $res; } /** * 检查消息是否发送过 * @param string $msg_md5 32位 md5,消息唯一标示 * * @access private * @author leeyi * @return boolean [true|false] 返回true 标示已经发送过消息了,不需要在发送了 */ static private function checkMsg($msg_md5) { $cache_key = self::$msg_check_key_prefix.$msg_md5; $count = \Yii::$app->cache->get($cache_key); return $count>0 ? true : false; } static private function isMobile($mobile) { return preg_match("/^1\d{10}$/", $mobile); } static private function isEmail($email) { $pattern = "/^([0-9A-Za-z\\-_\\.]+)@([0-9a-z]+\\.[a-z]{2,3}(\\.[a-z]{2})?)$/i"; return preg_match($pattern, $email); }}
Rabbit.php
*/namespace app\lib\AMQP;/** * 检查环境是否支持 */extension_loaded("amqp") or die("not extension amqp!");class Rabbit { /** * rabbitmq链接 * @name $connection */ public $connection=false; public $channel=false; public $error = ''; /** * 构造函数,初始化数据库链接等 * @author leeyi*/ public function __construct($options=array()) { $amqp = \Yii::$app->params['amqp']; if( empty($amqp) ) { exit('not config rabbitmq'); } $this->connection = $this->connect($options); $this->channel = new \AMQPChannel($this->connection); } /** * 获取最近一次错误 */ public function getError() { return $this->error; } /** * 持久化的消息发送 * @access public * @param string $queue_name 触发点标识(这里作为队列名称) * @param array $message 消息主体,一个多维数组,只包含元素 msg_md5 和 msg 两个元素,msg 可以是任意数组 * @param int $check_rule_time 触发点检查“奖励规则的当前时间“,作为 mq queue 的 timestamp attr * @return true * @author leeyi */ public function sending($queue_name, $message, $check_rule_time) { /** * 下面的 message 必须是 包含 msg_md5 和 msg 元素的数组,不然后端程序不会处理该消息 * msg_md5 消息身份标识 */ if( !isset($message['msg_md5']) || empty($message['msg_md5']) || !isset($message['msg']) || $check_rule_time<1 ) { return false; } $direct_exchange_name = 'direct.exchange.'.$queue_name; $routingkey_name = 'routingkey.'.$queue_name; $queue_name = 'queue.'.$queue_name; //创建exchange名称和类型 $exchange = new \AMQPExchange($this->channel); $exchange->setName($direct_exchange_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); // 持久化 $exchange->declareExchange(); //创建queue名称,使用exchange,绑定routingkey $queue = new \AMQPQueue($this->channel); $queue->setName($queue_name); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($direct_exchange_name, $routingkey_name); $attrs = []; $attrs['content_type'] = 'application/json'; $attrs['content_encoding'] = 'utf-8'; $attrs['timestamp'] = $check_rule_time; // $attrs['delivery_mode'] = '2'; // 持久化消息 // 消息发布 // $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。 $this->channel->startTransaction(); $exchange->publish(json_encode($message), $routingkey_name, $flag=AMQP_MANDATORY, $attrs); $this->channel->commitTransaction(); return true; } /** * 链接rabbitmq 只运行在够着函数使用 * @param $options['host'] string 服务地址 * @param $options['port'] int 端口 * @param $options['login'] string 登录账号 * @param $options['password'] string 账号密码 * @param $options['vhost'] string 虚拟站点 * @access protected * @author leeyi * @return $connect */ protected function connect($options=[]) { //连接RabbitMQ $amqp = \Yii::$app->params['amqp']; $options = array_merge( [ 'host'=>$amqp['host'] , 'port'=> $amqp['port'], 'vhost'=>$amqp['vhost'], 'login'=>$amqp['user'], 'password'=>$amqp['password'], ], (array)$options); $connection = new \AMQPConnection($options); $connection->connect(); return $connection; } /** * 析构函数 * @access public */ public function __destruct() { // 断开链接 $this->connection && $this->connection->disconnect(); }}