Spring Integration in Action學習筆記(1)

Z-xuan Hong
21 min readJan 17, 2021

--

說到撰寫應用程式大家應該都很熟悉,但在應用程式越來越複雜的情況下,跟不同應用程式整合也變得非常重要,此篇文章紀錄學習Spring Integration框架的心得,也希望此文章能夠達到以下目標:

  • 了解Event-Driven Architecture
  • 了解Spring Integration
  • 了解如何實際應用Spring Integration

Event-Driven Architecture

剛開始初學習程式時,我們被教導程式的執行是由左到右、由上到下依序根據每行指令(Instruction)執行,這種同步(Synchronization)的方式能夠幫助我們釐清執行的順序或是軟體元件之間的互動,物件導向中物件與物件之間大部分也是透過同步的方式傳遞訊息(Message)互相合作。

這種方式大部分時候都沒有太大問題,程式碼的控制流程(Control Flow)也非常簡單、好懂,但如果在應用程式(Application)之間的整合使用同步的方式卻會有以下問題:

  • 耦合(Coupling):應用程式要能夠達到Self-Contained的目標,彼此要能夠獨立成長(Evolve Independently)、獨立運作,讓彼此的影響降到最低,同步的整合方式,會產生過強的依賴,因為應用程式需明確知道要跟哪個目標應用程式(Target Application)進行溝通,當應用程式之間過度依賴對方的資訊、過度互相進行假設(Make Assumption)便會增加彼此之間的耦合性。
  • 擴展(Scalability):使用同步的整合方式,代表Sender需要等待Receiver的回應,當Request量很大時,應用程式便會消耗資源導致沒辦法處理其他的Request。
  • 容錯(Fault tolerance):使用同步的整合方式,代表當被依賴應用程式(Depended Application)掛掉時,相依應用程式(Dependent Application)便會失效(Failure),即便透過Retry重試,效果也有限,況且客戶端也無法等這麼久,會降低整體使用者體驗。

試著想想在現實生活中,當電信商寄給帳單時(Sender Side),我們不會馬上進行繳費(Receiver Side),而是等待我們有空時,再進行繳費,這就是非同步的例子,試著想想如果用同步的方式處理,會造成我們多大的困擾。

What is Event-Driven Architecture:透過事件(Event)的方式來整合應用程式之間的溝通,事件以非同步的方式進行發送。常見溝通的方式有兩種:

  • Bidirectional Communication:當Sender送出事件後,不會等待Receiver回應,當Receiver處理完畢後,再透過事件的方式通知Sender,讓Sender處理完後續流程等等。
  • Unidirectional Communication:Sender通知Receiver處理下一個步驟,Receiver則不會回傳事件給Sender。

Why Event-Driven:

  • 耦合(Coupling):應用程式能夠達到Self-Contained的目標,能夠獨立成長(Evolve Independently)、獨立運作,讓彼此的影響降到最低,降低彼此之間的耦合,因為該應用程式只知道發送了什麼Message,不知道哪個目標應用程式處理此Message,讓應用程式之間的互相假設降到最低。
  • 擴展(Scalability):由於Sender不會等待Receiver的回應,Sender能夠處理更多的Request,也能在Receiver端增加多個Consumer來處理事件,能大大增加應用程式整體的擴展性。
  • 容錯(Fault tolerance):SenderReceiver之間透過Channel的方式溝通,當Receiver掛掉時,Channel能夠把Event Buffer起來,等待Receiver狀態回覆後再進行後續處理。

Spring Integration

What is Spring Integration:Spring Integration是一種實作各種Integration Pattern的框架,,讓我們使用Event-Driven的方式進行應用程式之間的整合。

Why Spring Integration:他幫助我們達到以下幾件事情:

  • 透過IOC(Inversion of Control)的方式讓應用程式不會察覺Message Infrastructure的存在,讓我們專注開發商業邏輯元件。
  • 透過Configuration的方式,我們能夠輕易的切換應用程式Runtime的溝通方式,像是同步溝通、非同步溝通等等。
  • 由於Spring Integration已經幫我們把Message Infrastructure開發好,並且測試好,我們只需要測試商業邏輯元件,與簡單的Happy Path整合測試即可,大幅地降低我們需要撰寫的測試和Message Infrastructure元件程式。

Abstraction:在Spring Integration中有三大抽象元件是我們必須要知道的,分別是:

  • Message:此元件是應用程式之間的Contract,通常Message會包含兩種資訊,分別是Payload與HeaderPayload用來儲存應用程式之間溝通的資訊、Header則是用來存儲Message的Metadata。
  • Message Channel:此元件負責傳遞Message,Sender透過Message Channel傳送Message,讓Sender與Receiver解耦合,Sender與Receiver也不需要知道Message Channel實際上是以同步或是非同步的方式傳遞訊息,能讓應用程式之間能達到Loose Coupling
  • Message Endpoint此元件為Message Channel的起始點與終點,負責連接應用程式的Service到適當的Channel

以下方別介紹三種抽象化的實作細節:

Message:

  • Command Message:用來讓Receiver執行某件操作。
  • Notification Message:用來通知Receiver某些事件已經發生。
  • Document Message :用來傳送資料。

Message Channel

  • Subscribable Channel:透過同步的方式傳遞訊息,Receiver與Sender會在同一個Transaction Context。
  • Pollable Channel:透過非同步的方式傳遞訊息到Message Queue,Message Endpoint再透過Poller抓取訊息並且傳遞給Service執行。

Message Endpoint

  • Channel Adapter:透過單向的方式連接應用程式到Message Infrastructure。
  • Gateway:透過雙向的方式連接應用程式到Message Infrastructure,主要用來封裝Messaging-Specific Method Calls,並且Expose Domain Specific介面給應用程式(符合Dependency Inversion Principle)
  • Service Activator:將Service整合到Message Infrastructure並且讓其他應用程式能夠透過Message的方式進行存取,Spring Integration中此元件通常作用於Local Method Invocation
  • Router:根據Message的Payload來決定此Message要被傳送到哪個Message Channel
  • Splitter:將接收到的Message拆解成多個Message,使用依序或是同步的方式送到Message Channel。
  • Aggregator:將接受到的多個Message組合起來,在送到Message Channel。

Spring Integration實際範例

以下透過簡單的應用情境示範Spring Integration:

當使用者放置訂單時,便會通知訂單紀錄系統,此系統進行訂單的紀錄。

為了達到以上目標,我們需要OrderServiceOrderRecordService,還有order-service Message Channel來進行整合,以下為設計圖。

範例程式架構圖

我們先來看OrderService的測試案例與實作:

  • 測試案例:我們可以看到,當呼叫OrderService的PlaceOrder時,我們驗證Order確實被成功建立、OrderPlaced事件成功被發送

此測試有幾點值得注意:

  • 測試以Usecase的方式撰寫,能夠增加可讀性
  • 測試透過Mock的方式驗證事件有被正確發送
  • 測試透過Mock的方式隔離測試對於Database的依賴
  • Mock DomainEventPublisher而不是OrderDomainEventPublisher能夠降低測試對於Application Code的耦合,因為OrderDomainEventPublisher是根據Domain設計的介面,如果Mock OrderDomainEventPublisher會導致當Domain介面的重構時,測試會失敗(Fragile Test),因此Mock系統邊界的元件(DomainEventPublisher),來提升測試的穩定性。
class OrderServiceTest {
private final String orderName = "First Order";
private final double orderPrice = 199.9;
private final FakeOrderRepository orderRepository = new FakeOrderRepository();
private final FakeDomainEventPublisher<OrderPlaced> domainEventPublisher = new FakeDomainEventPublisher<>();

@Test
void place_order() {
// Arrangement
OrderService orderService = createOrderService(domainEventPublisher);
// Act
Result<Long> result = orderService.placeOrder(orderName, orderPrice);
// Assertion
assertTrue(result.isSuccess());
Long orderId = result.getValue();

assertCreatedOrderSuccessfully(orderId);

OrderPlaced orderPlaced = new OrderPlaced(orderId, orderName, orderPrice);
domainEventPublisher
.should()
.published(orderPlaced);
}

private OrderService createOrderService(DomainEventPublisher<OrderPlaced> domainEventPublisher) {
OrderDomainEventPublisher orderDomainEventPublisher = new OrderDomainEventPublisher(domainEventPublisher);
return new OrderService(orderRepository, orderDomainEventPublisher);
}

private void assertCreatedOrderSuccessfully(Long orderId) {
Optional<Order> optionalOrder = orderRepository.findOrder(orderId);
assertTrue(optionalOrder.isPresent());
Order order = optionalOrder.get();
assertEquals(orderName, order.getName());
assertEquals(orderPrice, order.getPrice());
}
}
  • 實作 (以下實作省略錯誤處理):我們可以看到,當呼叫OrderService的PlaceOrder時,會產生一個Order物件並且透過Repository儲存,再透過Publisher發布OrderPlaced事件。
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OrderDomainEventPublisher publisher;

@Autowired
public OrderService(OrderRepository orderRepository, OrderDomainEventPublisher publisher) {
this.orderRepository = orderRepository;
this.publisher = publisher;
}

@Transactional
public Result<Long> placeOrder(String orderName, double orderPrice) {
Long orderId = orderRepository.nextIdentity();
Order order = new Order(orderId, orderName, orderPrice);
orderRepository.save(order);
publisher.orderPlaced(orderId, orderName, orderPrice);
return Result.success(orderId);
}
}
public class OrderPlaced implements DomainEvent {
private final Long orderId;
private final String name;
private final double price;

public OrderPlaced(Long orderId, String name, double price) {
this.orderId = orderId;
this.name = name;
this.price = price;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrderPlaced that = (OrderPlaced) o;
return Double.compare(that.price, price) == 0 &&
Objects.equals(orderId, that.orderId) &&
Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(orderId, name, price);
}
}
@Component
public class OrderDomainEventPublisher {
private final DomainEventPublisher<OrderPlaced> domainEventPublisher;

@Autowired
public OrderDomainEventPublisher(DomainEventPublisher<OrderPlaced> domainEventPublisher) {
this.domainEventPublisher = domainEventPublisher;
}

public void orderPlaced(Long orderId, String name, double price) {
OrderPlaced orderPlaced = new OrderPlaced(orderId, name, price);
domainEventPublisher.publish(orderPlaced);
}
}
public interface DomainEventPublisher<T extends DomainEvent> {
void publish(T domainEvent);
}

以下為OrderRecordService的實作 (單純示範,只儲存在List裡面)

@Service
public class OrderRecordService {
private List<OrderPlaced> orderPlacedList = new ArrayList<>();

public void recordOrder(OrderPlaced orderPlaced) {
this.orderPlacedList.add(orderPlaced);
}

public boolean contains(OrderPlaced orderPlaced) {
if (orderPlacedList.size() == 0)
return false;
return orderPlacedList
.stream()
.anyMatch(op -> op.equals(orderPlaced));
}
}

以下為兩個Service的整合測試:

@TestPropertySource(locations="classpath:application-test.yml")
@SpringBootTest(
classes = DemoApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE
)
@ActiveProfiles("test")
class MessageTest {
@Autowired
OrderService orderService;
@Autowired
OrderRecordService orderRecordService;

@Test
void record_order_service_should_record_when_order_placed() {
String name = "First Order";
double price = 199.9;
Result<Long> result = orderService.placeOrder(name, price);

OrderPlaced expectedOrderPlaced = new OrderPlaced(result.getValue(), name, price);
orderRecordService.contains(expectedOrderPlaced);
}
}

從上面範例程式碼我們發現一件事情,程式中幾乎看不到Spring Integration的影子,這就是Spring Integration想要達到的目標,針對Application Component與Message Infrastructure解耦合。

以下為Spring Integration的XML設定檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:component-scan base-package="com.example.demo"/>

<channel id="order-service">
<queue capacity="5" />
</channel>

<gateway
id="domainEventPublisher"
service-interface="com.example.demo.domain.DomainEventPublisher"
default-request-channel="order-service"/>
<outbound-channel-adapter
channel="order-service"
ref="orderRecordService"
method="recordOrder">
<poller fixed-rate="3000000" />
</outbound-channel-adapter>
</beans:beans>

此設定檔案我們看到

  • 宣告一個order-service的Pollable Message Channel
  • 透過Gateway的方式來Proxy DomainEventPublisher
  • 創建一個Channel Adapter來整合OrderRecordService

上述程式碼整體流程如下:

  • 當OrderService的PlaceOrder完成時,會透過DomainEventPublisher發送OrderPlaced事件,DomainEventPublisher為Spring Integration產生的Dynamic Proxy,會自動將事件包裝成Message,並且送到order-service Message Channel。
  • order-service Message-Channel有事件時,Channel Adapter會將Message的Payload抓出來(OrderPlaced),並呼叫OrderRecordService的Record函數。

Spring Integration雖然是在不同Thread執行,但還是在同一個JVM底下執行,如果轉變成Microservices,則需要使用MessageBroker,像是Redis、RabbitMQ等,由於Spring Integration與Spring框架強烈遵守IOC、DI特性,我們可以輕易抽換成MessageBroker,只需要針對DomainEventPublisher抽換即可。

如果想使用DDD的方式切割Bounded Context又不想過早使用分散式系統,Spring Integration能夠幫助我們達到此目標(Defer Decision Making),先使用一個JVM開發,等到需要時在切換成Message Broker,並且讓各個Bounded Context在不同Process上執行。

結論:Event-Driven能讓應用程式彼此之間耦合性更低、Spring Integration透過IOC的方式讓商業邏輯與Message Infrastructure解耦合。

--

--

Z-xuan Hong
Z-xuan Hong

Written by Z-xuan Hong

熱愛軟體開發、物件導向、自動化測試、框架設計,擅長透過軟工幫助企業達到成本最小化、價值最大化。單元測試企業內訓、導入請私訊信箱:t107598042@ntut.org.tw。