判断设备是否在线,一般情况是通过发送心跳包来实现,超过设定的时间段而设备未回复,判断为设备离线
Nginx Gatway
设备访问后台,默认url以device_connection_servic开头, 会转发后台device-connection-service,该服务使用GRPC协议
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 |     server {limit_req zone=reqlimit burst=100;
 listen 8082 ssl http2 so_keepalive=5:3:1;
 listen [::]:8082 ssl http2 so_keepalive=5:3:1;
 server_name  localhost;
 ssl_certificate /etc/nginx/ssl/nginx.crt;
 ssl_certificate_key /etc/nginx/ssl/nginx.key;
 
 resolver 127.0.0.11 valid=60s;
 resolver_timeout 3s;
 
 location /device_connection_service {
 set $forward_device_connection grpc://device-connection-service:50051;
 grpc_pass $forward_device_connection;
 proxy_connect_timeout 1d;
 proxy_send_timeout 1d;
 proxy_read_timeout 1d;
 grpc_connect_timeout 1d;
 grpc_send_timeout 1d;
 grpc_read_timeout 1d;
 grpc_socket_keepalive on;
 }
 }
 
 | 
GRPC协议定义
Proto
与设备端通信接口
| 1
 | rpc GetActionStream(stream ActionStreamRequest) returns (stream Action);
 | 
与后台调用通信接口
| 12
 
 | rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse);
 
 | 
Device 心跳服务
GetActionStream主要逻辑
检测逻辑 接收设备发送信息。如果stream方式,接收srv.Recv设备消息
通过定时器,判断是否超时
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | for {go func() {
 _, err := srv.Recv()
 done <- err
 }()
 
 
 timer := time.NewTimer(GET_ACTION_STREAM_RECV_TIMEOUT_IN_SECS * time.Second)
 
 select {
 
 
 case <-timer.C:
 timeoutCount += 1
 
 case <-isDisconnected:
 return
 case err := <-done:
 if err == io.EOF {
 eofCount += 1
 } else {
 eofCount = 0
 timeoutCount = 0
 }
 }
 
 | 
由于设定的stream为双向的,后台也可以主动与设备通信srv.Send,来检测设备是否正常。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 
 | channel := make(chan *deviceConnectionPb.Action, 1)actionChanMap.Store(deviceId, channel)
 
 for {
 
 select {
 case action := <-channel:
 log.Info(fmt.Sprintf("send to %s, action %s", deviceId, action.Type))
 srv.Send(action)
 
 case <-connectionFailed:
 
 log.Info("Device " + deviceId + " max EOF or timeout count exceeded, force disconnect from server.")
 actionChanMap.Delete(deviceId)
 closeStreamChanMap.Delete(deviceId)
 isDisconnected <- true
 return nil
 
 
 case <-timer.C:
 srv.Send(&deviceConnectionPb.Action{
 Type: deviceConnectionPb.ActionType_PING,
 })
 }
 
 | 
Backend status获取
后台请求Device conntion,通过CheckStatus接口,获取在线状态
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | rpc CheckStatus(CheckStatusRequest) returns (CheckStatusResponse);message CheckStatusRequest {
 string device_id = 1;
 string device_ip = 2;
 }
 
 message CheckStatusResponse {
 ResponseStatus status = 1;
 CheckStatusResponseData data = 2;
 }
 
 message CheckStatusResponseData {
 bool is_online = 1;
 }
 
 | 
后台判断是否在线
| 12
 3
 
 | if _, exist := actionChanMap.Load(req.GetDeviceId()); exist {isOnline = true
 }
 
 |