Selaa lähdekoodia

Merge branch 'master' of https://git.steerinfo.com/XTEMS/xt-ems-api

lirl 3 vuotta sitten
vanhempi
commit
3990d3481c

+ 0 - 75
src/main/java/com/steerinfo/ems/Utils/WebSocket.java

@@ -1,75 +0,0 @@
-package com.steerinfo.ems.Utils;
-
-import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.websocket.OnClose;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * @author Shadow
- * @create 2021-09-18 14:54
- * @project xt-ems-api
- */
-@ServerEndpoint("/websocket/{shopId}")
-@RestController
-@Component
-//此注解相当于设置访问URL
-public class WebSocket {
-
-    private Session session;
-
-    private static CopyOnWriteArraySet<WebSocket> webSockets =new CopyOnWriteArraySet<>();
-    private static Map<String,Session> sessionPool = new HashMap<String,Session>();
-
-    @OnOpen
-    public void onOpen(Session session, @PathParam(value="shopId")String shopId) {
-        this.session = session;
-        webSockets.add(this);
-        sessionPool.put(shopId, session);
-        System.out.println("【websocket消息】有新的连接,总数为:"+webSockets.size());
-    }
-
-    @OnClose
-    public void onClose() {
-        webSockets.remove(this);
-        System.out.println("【websocket消息】连接断开,总数为:"+webSockets.size());
-    }
-
-    @OnMessage
-    public void onMessage(String message) {
-        System.out.println("【websocket消息】收到客户端消息:"+message);
-    }
-
-    // 此为广播消息
-    public void sendAllMessage(String message) {
-        for(WebSocket webSocket : webSockets) {
-            System.out.println("【websocket消息】广播消息:"+message);
-            try {
-                webSocket.session.getAsyncRemote().sendText(message);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    // 此为单点消息
-    public void sendOneMessage(String shopId, String message) {
-        Session session = sessionPool.get(shopId);
-        if (session != null) {
-            try {
-                session.getAsyncRemote().sendText(message);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

+ 1 - 1
src/main/java/com/steerinfo/ems/emsgmpcjh/controller/EmsGmPcJhController.java

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.steerinfo.auth.utils.JwtUtil;
 import com.steerinfo.ems.Utils.DateUtils;
 import com.steerinfo.ems.Utils.ExcelToolUtils;
-import com.steerinfo.ems.Utils.WebSocket;
+import com.steerinfo.ems.websocket.controller.WebSocket;
 import com.steerinfo.ems.emsgmpcjh.mapper.EmsGmPcJhMapper;
 import com.steerinfo.ems.emsgmpcjh.model.EmsGmPcJh;
 import com.steerinfo.ems.emsgmpcjh.service.IEmsGmPcJhService;

+ 1 - 4
src/main/java/com/steerinfo/ems/emsprodplanweightadjustment/controller/EmsProdplanWeightAdjustmentController.java

@@ -2,7 +2,7 @@ package com.steerinfo.ems.emsprodplanweightadjustment.controller;
 
 import com.alibaba.fastjson.JSON;
 import com.steerinfo.auth.utils.JwtUtil;
-import com.steerinfo.ems.Utils.WebSocket;
+import com.steerinfo.ems.websocket.controller.WebSocket;
 import com.steerinfo.ems.emsgmpcjh.mapper.EmsGmPcJhMapper;
 import com.steerinfo.ems.emsgmpcjh.model.EmsGmPcJh;
 import com.steerinfo.ems.emsprodplanweightadjustment.mapper.EmsProdplanWeightAdjustmentMapper;
@@ -15,13 +15,10 @@ import com.steerinfo.ems.emsprodplanweightadjustment.service.IEmsProdplanWeightA
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shiro.authz.annotation.RequiresPermissions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.*;
-import java.math.BigDecimal;
 
 /**
  * EmsProdplanWeightAdjustment RESTful接口:

+ 148 - 0
src/main/java/com/steerinfo/ems/websocket/controller/DirectRabbitConfig.java

@@ -0,0 +1,148 @@
+package com.steerinfo.ems.websocket.controller;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.annotation.EnableRabbit;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+
+/**
+ * @author Shadow
+ * @create 2021-10-12 14:00
+ * @project xt-ems-api-new
+ * @description: 直连交换机队列配置文件
+ */
+@Configuration
+@EnableRabbit
+public class DirectRabbitConfig {
+    @Value("${spring.rabbitmq.wsExchangeName}")
+    private String WSExchangName;
+    /*@Autowired
+    StringRedisTemplate stringRedisTemplate;*/
+    //动态获取节点的IP
+    String ipaddr;
+   /* {
+        try {
+            ipaddr = InetAddress.getLocalHost().getHostAddress().replace(".","");
+            //redisTemplate.opsForValue().set(ipaddr,ipaddr);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+    }*/
+
+    /**
+     * @Description: 根据IP地址动态的创建队列
+     * @Param:
+     * @return:
+     * @Author:
+     * @Date: 2020/5/18
+     */
+    @Bean
+    public Queue createWsDirectQueue(){
+        ipaddr = getInternetIp().replace(".","")+"8088";
+        return new Queue(ipaddr);
+    }
+    /**
+     * @Description: 创建直连交换机
+     * @Param:
+     * @return:
+     * @Author:
+     * @Date: 2020/5/18
+     */
+    @Bean
+    public DirectExchange createWsDirectExchange(){
+        return new DirectExchange(WSExchangName);
+    }
+
+    //@Bean
+    //public ExchangeBuilder createWsDirectExchange(){
+    //    return new ExchangeBuilder(WSExchangName,"fanout");
+    //}
+    /**
+     * @Description: 将队列与直连交换机绑定,并指定ip地址为路由
+     * @Param:
+     * @return:
+     * @Author:
+     * @Date: 2020/5/18
+     */
+    @Bean
+    public Binding bindingExchange(){
+        return BindingBuilder.bind(createWsDirectQueue()).to(createWsDirectExchange()).with("ems");
+    }
+
+    /**
+     * @Description:  获取外网IP
+     * @Param:
+     * @return:
+     * @Author:
+     * @Date: 2020/5/22
+     */
+    private  String getInternetIp(){
+        try{
+            Enumeration<NetworkInterface> networks = NetworkInterface.getNetworkInterfaces();
+            InetAddress ip = null;
+            Enumeration<InetAddress> addrs;
+            while (networks.hasMoreElements())
+            {
+                addrs = networks.nextElement().getInetAddresses();
+                while (addrs.hasMoreElements())
+                {
+                    ip = addrs.nextElement();
+                    if (ip != null
+                            && ip instanceof Inet4Address
+                            && ip.isSiteLocalAddress()
+                    )
+                    {
+                        return ip.getHostAddress();
+                    }
+                }
+            }
+
+            // 如果没有外网IP,就返回内网IP
+            return "";
+        } catch(Exception e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @Description: 根据类型获取IP地址
+     * @Param:
+     * @return:
+     * @Author:
+     * @Date: 2020/5/22
+     */
+    private String getInternetIp(String type) {
+        try {
+            Enumeration<NetworkInterface> networks = NetworkInterface.getNetworkInterfaces();
+            while (networks.hasMoreElements()) {
+                NetworkInterface ni = (NetworkInterface) networks.nextElement();
+                if (!ni.getName().equals(type)) {
+                    continue;
+                } else {
+                    Enumeration<?> e2 = ni.getInetAddresses();
+                    while (e2.hasMoreElements()) {
+                        InetAddress ia = (InetAddress) e2.nextElement();
+                        if (ia instanceof Inet6Address) {
+                            continue;
+                        }
+                        return ia.getHostAddress();
+                    }
+                }
+            }
+            // 如果没有外网IP,就返回内网IP
+            return "";
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 54 - 0
src/main/java/com/steerinfo/ems/websocket/controller/DirectRabbitMsgReceiveConfig.java

@@ -0,0 +1,54 @@
+package com.steerinfo.ems.websocket.controller;
+
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author Shadow
+ *  @description:由于@RabbitListener的队列名称只能是常量,所以@RabbitListener监听注解
+ *              不能使用,只能使用rabbitmq的api接口
+ * @create 2021-10-11 17:24
+ * @project xt-ems-api-new
+ */
+//@Configuration
+//@EnableRabbit
+@Configuration
+@AutoConfigureAfter(RabbitTemplate.class)
+public class DirectRabbitMsgReceiveConfig {
+//    /**
+//     * json序列化
+//     * @return
+//     */
+//    @Bean
+//    public Queue helloQueue(){
+//        return new Queue("ems_webSocket");
+//    }
+//
+    /**
+     * 使用orderMqConsumer的orderShow方法作为listener。
+     * @return
+     */
+    @Bean("orderShowMessageListener")
+    MessageListenerAdapter orderShowMessageListener() {
+        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new WebSocket());
+        //WebSocket实现了ChannelAwareMessageListener类则可以不用指定默认的监听方法
+        //messageListenerAdapter.setDefaultListenerMethod("handleMessage1");
+        return messageListenerAdapter;
+    }
+    //
+    @Bean("orderShowMessageListenerContainer")
+    SimpleMessageListenerContainer orderShowMessageListenerContainer(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory, @Qualifier("orderShowMessageListener") MessageListenerAdapter listenerAdapter, @Qualifier("createWsDirectQueue") Queue queue) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
+        container.setConnectionFactory(connectionFactory);
+        //只接收该节点的队列信息
+        container.setQueueNames(queue.getName());
+        container.setMessageListener(listenerAdapter);
+        return container;
+    }
+}

+ 142 - 0
src/main/java/com/steerinfo/ems/websocket/controller/WebSocket.java

@@ -0,0 +1,142 @@
+package com.steerinfo.ems.websocket.controller;
+
+import com.rabbitmq.client.Channel;
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @author Shadow
+ * @create 2021-09-18 14:54
+ * @project xt-ems-api
+ * 此注解相当于设置访问URL
+ *
+ */
+@ServerEndpoint("/websocket/{shopId}")
+@RestController
+@Component
+public class WebSocket implements ChannelAwareMessageListener {
+    @Autowired
+    private AmqpTemplate rabbitTemplate;
+
+    @Autowired
+    private Queue createWsDirectQueue;
+    @Value("${spring.rabbitmq.wsExchangeName}")
+    private String wsDirectExchange;
+
+    //RabbitTemplate rabbitTemplate = ApplicationContext.getBean(RabbitTemplate.class);
+
+    private Session session;
+
+    private static final CopyOnWriteArraySet<WebSocket> webSockets =new CopyOnWriteArraySet<>();
+    private static final Map<String,Session> sessionPool = new HashMap<String,Session>();
+
+    /**
+     * 发送到消息队列
+     * @param msg
+     */
+    public void sendMessage(String msg){
+        rabbitTemplate.convertAndSend("ems_webSocket",msg);
+    }
+
+    //@RabbitListener(queues = "ems_webSocket")
+    public void receiveMessage(String msg){
+        System.out.println("收到消息:"+msg);
+        for(WebSocket webSocket : webSockets) {
+            System.out.println("【websocket消息】rabbitMq广播消息:"+msg);
+            try {
+                // 判断to用户是否在线
+                //if (webSocket.session != null && webSocket.session.isOpen()) {
+                //    //TODO 具体格式需要和前端对接
+                //    //toSession.sendMessage(new TextMessage(msgJson));
+                //    // 更新消息状态为已读
+                //    //this.messageDAO.updateMessageState(message.getId(), 2);
+                   webSocket.session.getAsyncRemote().sendText(msg);
+                //}else{
+                //    // 该用户可能下线,可能在其他的节点中,发送消息到MQ系统
+                //    // 需求:添加tag,便于消费者对消息的筛选
+                //    this.rabbitTemplate.convertAndSend("ems_webSocket",message);
+                //}
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @OnOpen
+    public void onOpen(Session session, @PathParam(value="shopId")String shopId) {
+        this.session =  session;
+        webSockets.add(this);
+        sessionPool.put(shopId, session);
+        System.out.println("【websocket消息】有新的连接,总数为:"+webSockets.size());
+    }
+
+    @OnClose
+    public void onClose() {
+        webSockets.remove(this);
+        System.out.println("【websocket消息】连接断开,总数为:"+webSockets.size());
+    }
+
+    @OnMessage
+    public void socket_onMessage(String message) {
+        System.out.println("【websocket消息】收到客户端消息:"+message);
+        //this.sendAllMessage(message);
+        //WebSocket webSocket = new WebSocket();
+        //webSocket.sendAllMessage(message);
+    }
+
+    // 此为广播消息
+    public void sendAllMessage(String message) {
+        System.out.println("【websocket消息】广播消息:"+message);
+        String queueName = createWsDirectQueue.getName();
+        rabbitTemplate.convertAndSend(wsDirectExchange,"ems",message);
+    }
+
+    // 此为单点消息
+    public void sendOneMessage(String shopId, String message) {
+        Session session = (Session) sessionPool.get(shopId);
+        if (session != null) {
+            try {
+                session.getAsyncRemote().sendText(message);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 如果没有继承ChannelAwareMessageListener可以将该方法指定为适配器的接收方法,
+     * 如果继承了ChannelAwareMessageListener,则优先执行onMessage方法;
+     * @param msg
+     */
+    public void handleMessage1(String msg) {
+        System.out.println("handleMessage默认方法,消息内容 String:" + msg);
+    }
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        for(WebSocket webSocket : webSockets) {
+            System.out.println("【websocket消息】rabbitMq广播消息:"+message);
+            try {
+                webSocket.session.getAsyncRemote().sendText(new String(message.getBody()));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 1 - 2
src/main/java/com/steerinfo/ems/Utils/WebSocketConfig.java → src/main/java/com/steerinfo/ems/websocket/controller/WebSocketConfig.java

@@ -1,4 +1,4 @@
-package com.steerinfo.ems.Utils;
+package com.steerinfo.ems.websocket.controller;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -22,5 +22,4 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
     public ServerEndpointExporter serverEndpointExporter() {
         return new ServerEndpointExporter();
     }
-
 }

+ 6 - 6
src/main/java/com/steerinfo/ftp/uploadfile/utils/FtpFileUtil.java

@@ -68,10 +68,10 @@ public class FtpFileUtil {
         boolean result = false;
         try {
             // 连接FTP服务器
-            ftpClient.enterLocalPassiveMode();
             ftpClient.connect(FTP_ADDRESS, FTP_PORT);
             ftpClient.login(FTP_USERNAME, FTP_PASSWORD);
             int reply;
+            ftpClient.enterLocalPassiveMode();
             reply = ftpClient.getReplyCode();
             if (!FTPReply.isPositiveCompletion(reply)) {
                 ftpClient.disconnect();
@@ -196,9 +196,9 @@ public class FtpFileUtil {
         String data = fileName + "预览失败";
         try {
             connectToServer();
-            ftpClient.enterLocalPassiveMode();
             // 设置传输二进制文件
             setFileType(FTP.BINARY_FILE_TYPE);
+            ftpClient.enterLocalPassiveMode();
             int reply = ftpClient.getReplyCode();
             if(!FTPReply.isPositiveCompletion(reply)){
                 ftpClient.disconnect();
@@ -272,9 +272,9 @@ public class FtpFileUtil {
         boolean flag = false;
         try{
             connectToServer();
-            ftpClient.enterLocalPassiveMode();
             // 设置传输二进制文件
             setFileType(FTP.BINARY_FILE_TYPE);
+            ftpClient.enterLocalPassiveMode();
             int reply = ftpClient.getReplyCode();
             if(!FTPReply.isPositiveCompletion(reply)){
                 ftpClient.disconnect();
@@ -318,9 +318,9 @@ public class FtpFileUtil {
         ByteArrayOutputStream os = new ByteArrayOutputStream();
         try{
             connectToServer();
-            ftpClient.enterLocalPassiveMode();
             // 设置传输二进制文件
             setFileType(FTP.BINARY_FILE_TYPE);
+            ftpClient.enterLocalPassiveMode();
             int reply = ftpClient.getReplyCode();
             if(!FTPReply.isPositiveCompletion(reply)){
                 ftpClient.disconnect();
@@ -391,11 +391,11 @@ public class FtpFileUtil {
              try {
                 //建立连接
                  ftpClient = new FTPClient();
-                 ftpClient.setControlEncoding("GBK");//windows default服务器gbk
+                 //ftpClient.setControlEncoding("GBK");//windows default服务器gbk
                  ftpClient.setControlEncoding("UTF-8");//linux default UTF-8
-                 ftpClient.enterLocalPassiveMode();
                  ftpClient.connect(FTP_ADDRESS, FTP_PORT);
                  ftpClient.login(FTP_USERNAME, FTP_PASSWORD);
+                 ftpClient.enterLocalPassiveMode();
                  reply = ftpClient.getReplyCode();
                  if(!FTPReply.isPositiveCompletion(reply)){
                      ftpClient.disconnect();