null Liferayのメッセージバスを使ってキュー型非同期ジョブを実行する

今回は、Liferayのメッセージバスを使って非同期で1つずつジョブを実行するキュー型ジョブ実行のやり方を紹介したいと思います。

キュー型かどうかにかかわらずUIから時間のかかる操作を実行したい場合など非同期でなんらかのアクションを実行したいケースはさまざま考えられます。Liferayでは非同期実行を実現するためにメッセージバスと呼ばれる機構を備えています。この記事では最初に簡単にLiferayのメッセージバスについて説明したあとに具体的にキュー型のジョブ実行のサンプルを紹介したいと思います。

メッセージバスのエンティティ

メッセージバスでは、さまざまなコンポーネントがメッセージバスに対して宛先であるデスティネーション(Destination)を指定して、メッセージを非同期で送付することができます。

このメッセージバスはいわゆるオブザーバー(リスナー)のパターンに沿って拡張が可能です。具体的にはデスティネーションを登録する処理(DestinationConfiguration)を開発しデスティネーションをサブスクライブするMessageListenerを記述することにより新しい非同期処理を追加することができます。このデスティネーションの登録処理を担うクラスはDestinationConfigurationと呼ばれます。

またデスティネーションには、到来したメッセージを並列で処理するAsyncDestinationと直列で処理するSerialDestinationの2種類があります。

これらをまとめると以下のような図となります。

キュー型非同期ジョブ実行のサンプル紹介

以下ではキュー型の非同期ジョブ実行のコードサンプルを紹介します。

まず最初はDestinationを登録するDestinationConfigurationのコードです。

package QueueJobForBlog;

import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.util.HashMapDictionary;

import java.util.Dictionary;
import java.util.Map;

import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;

@Component(
		immediate = true,
		service = {}
		)
public class QueueConfiguration {

	@Activate
	public void activate(
			BundleContext bundleContext, Map<String, Object> properties) {


		DestinationConfiguration destinationConfiguration =
				DestinationConfiguration.createSerialDestinationConfiguration(QueueConstants.DESTINATION_NAME);

		destinationConfiguration.setMaximumQueueSize(1000);

		Destination destination = this._destinationFactory.createDestination(destinationConfiguration);

		Dictionary<String, object> dictionary = new HashMapDictionary<>();

		dictionary.put("destination.name", destination.getName());

		_serviceRegistration = bundleContext.registerService(
				Destination.class, destination, dictionary);
	}

	@Deactivate
	public void deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Modified
	public void modified(
			BundleContext bundleContext, Map<String, object> properties) {

		deactivate();
		activate(bundleContext, properties);
	}


	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<destination> _serviceRegistration;
}

OSGiフレームワークにDeclarativeServiceとしてDestinationを登録しています。ここではDestinationとしてキュー型のSerialDestinationを指定しています。

次にメッセージを処理するMessageListenerです。

package QueueJobForBlog;

import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageListener;

import org.osgi.service.component.annotations.Component;

@Component(
        immediate = true,
        property = "destination.name=" + QueueConstants.DESTINATION_NAME,
        service = MessageListener.class
)
public class QueueJobListener extends BaseMessageListener {

	@Override
	protected void doReceive(Message message) throws Exception {

		System.out.println("message received");
		System.out.println(message.getInteger("myId"));
		
		Thread.sleep(3 * 1000);
		
		System.out.println("message processed");
		
	}
}

サブスクライブするデスティネーションは、サービスのアノテーションで指定します。

最後にメッセージバスに複数のイベントを登録して、キューの状態を確認するgogo Shellコマンドを用意します。

package QueueJobForBlog;

import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationStatistics;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;

import java.util.ArrayList;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

@Component(
		property = {
				"osgi.command.function=regEvent",
				"osgi.command.function=showDest",
				"osgi.command.scope=blog"		
		},
		immediate = true,
		service = Object.class
		)
public class QueueCommand {

	public void regEvent(int numEvent) {

		for(int i = 0 ; i < numEvent; i++ ) {
			Message message = new Message();
			message.put("myId", i);
			message.setPayload(new ArrayList;<Long>());
			_messageBus.sendMessage(QueueConstants.DESTINATION_NAME, message);
			System.out.println("message sent: " + i);
		}

	}
	
	public void showDest() {
		Destination dest = this._messageBus.getDestination(QueueConstants.DESTINATION_NAME);	
		DestinationStatistics stats = dest.getDestinationStatistics();
		System.out.println("-- PendingMessageCount:" + stats.getPendingMessageCount());
		System.out.println("-- SendMessageCount:" + stats.getSentMessageCount());
	}

	@Reference
	private MessageBus _messageBus;
}

デスティネーションの名前を指定してメッセージバスにメッセージを送付しています。実行すると以下のようにキューに溜まったメッセージが処理されていくのが確認できます。

g! regEvent 10
message sent: 0
message received
message sent: 1
message sent: 2
message sent: 3
message sent: 4
message sent: 5
message sent: 6
message sent: 7
message sent: 8
message sent: 9
0
g!
message processed
message received
1
g! showDest
-- PendingMessageCount:8
-- SendMessageCount:1
...
message received
3
$! showDest
-- PendingMessageCount:6
-- SendMessageCount:3
...
message received
9
g! showDest
-- PendingMessageCount:0
-- SendMessageCount:10

まとめ

メッセージバスを用いて非同期処理を実行する方法を簡単に説明しました。すこし用語が分かりにくいですが慣れてしまえば、とてもシンプルに非同期処理を追加できるのでぜひ試してみてください。

RANKING
2021.1.08
2020.12.28
2020.12.01
2020.10.30
2020.12.18