序
本文主要研究一下openmessaging-java
maven
io.openmessaging openmessaging-api 0.3.1-alpha
maven最新的是0.3.1-alpha,这里直接用源码的0.3.2-alpha-SNAPSHOT
producer
Producer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
/** * A {@code Producer} is a simple object used to send messages on behalf * of a {@code MessagingAccessPoint}. An instance of {@code Producer} is * created by calling the {@link MessagingAccessPoint#createProducer()} method. ** It provides various {@code send} methods to send a message to a specified destination, * which is a {@code Queue} in OMS. *
* {@link Producer#send(Message)} means send a message to the destination synchronously, * the calling thread will block until the send request complete. *
* {@link Producer#sendAsync(Message)} means send a message to the destination asynchronously, * the calling thread won't block and will return immediately. Since the send call is asynchronous * it returns a {@link Future} for the send result. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface Producer extends MessageFactory, ServiceLifecycle { /** * Returns the attributes of this {@code Producer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}. *
* There are some standard attributes defined by OMS for {@code Producer}: *
- *
- {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance. *
- {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}. *
* A transactional message will be exposed to consumer if and only if the local transaction * branch has been committed, or be discarded if local transaction has been rolled back. * * @param message a transactional message will be sent * @param branchExecutor local transaction executor associated with the message * @param attributes the specified attributes * @return the successful {@code SendResult} * @throws OMSMessageFormatException if an invalid message is specified. * @throws OMSTimeOutException if the given timeout elapses before the send operation completes * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. */ SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes); /** * Sends a message to the specified destination asynchronously, the destination should be preset to * {@link Message#sysHeaders()}, other header fields as well. *
* The returned {@code Promise} will have the result once the operation completes, and the registered * {@code FutureListener} will be notified, either because the operation was successful or because of an error. * * @param message a message will be sent * @return the {@code Promise} of an asynchronous message send operation. * @see Future * @see FutureListener */ Future
* The returned {@code Promise} will have the result once the operation completes, and the registered * {@code FutureListener} will be notified, either because the operation was successful or because of an error. * * @param message a message will be sent * @param attributes the specified attributes * @return the {@code Promise} of an asynchronous message send operation. * @see Future * @see FutureListener */ Future
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't * care about the send result and also have no context to get the result. * * @param message a message will be sent */ void sendOneway(Message message); /** * Sends a message to the specified destination in one way, using the specified attributes, the destination * should be preset to {@link Message.BuiltinKeys}, other header fields as well. *
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't * care about the send result and also have no context to get the result. * * @param message a message will be sent * @param properties the specified userHeaders */ void sendOneway(Message message, KeyValue properties); /** * Creates a {@code BatchMessageSender} to send message in batch manner. * * @return a {@code BatchMessageSender} instance */ BatchMessageSender createBatchMessageSender(); /** * Adds a {@code ProducerInterceptor} to intercept send operations of producer. * * @param interceptor a producer interceptor */ void addInterceptor(ProducerInterceptor interceptor); /** * Removes a {@code ProducerInterceptor} * * @param interceptor a producer interceptor will be removed */ void removeInterceptor(ProducerInterceptor interceptor);}
- 这里提供了诸多发送消息的方法,也提供创建BatchMessageSender的方法
- 消息这里封装为Message,设置的属性封装为KeyValue,发送结果封装为SendResult
Message
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/Message.java
/** * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is * {@link BytesMessage}. ** Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of * header and body and is used by separate applications to exchange a piece of information, * like Apache RocketMQ. *
* The header contains fields used by the messaging system that describes the message's meta information, * like QoS level, origin, destination, and so on, while the body contains the application data being transmitted. *
* As for the header, OMS defines two kinds types: System Header and User Header, * with respect to flexibility in vendor implementation and user usage. *
- *
- * System Header, OMS defines some standard attributes that represent the characteristics of the message. * *
- * User Header, some OMS vendors may require enhanced extra attributes of the message * or some users may want to clarify some customized attributes to draw the body. * OMS provides the improved scalability for these scenarios. * *
* In BytesMessage, the body is just a byte array, may be compressed and uncompressed * in the transmitting process by the messaging system. * The application is responsible for explaining the concrete content and format of the message body, * OMS is never aware of that. * * The body part is placed in the implementation classes of {@code Message}. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface Message { /** * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. * * @return the system headers of a {@code Message} * @see BuiltinKeys */ KeyValue sysHeaders(); /** * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. * * @return the user headers of a {@code Message} */ KeyValue userHeaders(); /** * Puts a {@code String}-{@code int} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to key */ Message putSysHeaders(String key, int value); /** * Puts a {@code String}-{@code long} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to key */ Message putSysHeaders(String key, long value); /** * Puts a {@code String}-{@code double} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to key */ Message putSysHeaders(String key, double value); /** * Puts a {@code String}-{@code String} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to key */ Message putSysHeaders(String key, String value); /** * Puts a {@code String}-{@code int} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to key */ Message putUserHeaders(String key, int value); /** * Puts a {@code String}-{@code long} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to key */ Message putUserHeaders(String key, long value); /** * Puts a {@code String}-{@code double} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to key */ Message putUserHeaders(String key, double value); /** * Puts a {@code String}-{@code String} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to key */ Message putUserHeaders(String key, String value); /** * Get message body * * @param type Message body type * @param
* When a message is sent, MESSAGE_ID is assigned by the producer. */ String MESSAGE_ID = "MESSAGE_ID"; /** * The {@code DESTINATION} header field contains the destination to which the message is being sent. *
* When a message is sent this value is set to the right {@code Queue}, then the message will be sent to * the specified destination. *
* When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. */ String DESTINATION = "DESTINATION"; //...... }}
- Message接口主要定义了设置header以及获取body的方法
- 另外还内置了BuiltinKeys的header,比如MESSAGE_ID,DESTINATION等
KeyValue
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
/** * The {@code KeyValue} class represents a persistent set of attributes, * which supports method chaining. ** A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type * as values: {@code int}, {@code long}, {@code double}, {@code String}. *
* The {@code KeyValue} is a replacement of {@code Properties}, with simpler * interfaces and reasonable entry limits. *
* A {@code KeyValue} object may be used in concurrent scenarios, so the implementation * of {@code KeyValue} should consider concurrent related issues. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface KeyValue { /** * Inserts or replaces {@code int} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to key */ KeyValue put(String key, int value); /** * Inserts or replaces {@code long} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to key */ KeyValue put(String key, long value); /** * Inserts or replaces {@code double} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to key */ KeyValue put(String key, double value); /** * Inserts or replaces {@code String} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to key */ KeyValue put(String key, String value); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, int) */ int getInt(String key); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, int) */ int getInt(String key, int defaultValue); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, long) */ long getLong(String key); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, long) */ long getLong(String key, long defaultValue); /** * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, double) */ double getDouble(String key); /** * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, double) */ double getDouble(String key, double defaultValue); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, {@code null} is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, String) */ String getString(String key); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, String) */ String getString(String key, String defaultValue); /** * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object. *
* The set is backed by the {@code KeyValue}, so changes to the set are * reflected in the @code KeyValue}, and vice-versa. * * @return the key set view of this {@code KeyValue} object. */ Set
keySet(); /** * Tests if the specified {@code String} is a key in this {@code KeyValue}. * * @param key possible key * @return true
if and only if the specified key is in this {@code KeyValue},false
* otherwise. */ boolean containsKey(String key);}
- 主要封装int、long、double、Sting这几种类型,其中key只能为String类型
SendResult
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/SendResult.java
/** * The result of sending a OMS message to server * with the message id and some attributes. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface SendResult { /** * The unique message id related to the {@code SendResult} instance. * * @return the message id */ String messageId();}
- 目前仅仅定义一个返回messageId的方法
BatchMessageSender
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/BatchMessageSender.java
/** * A message sender created through {@link Producer#createBatchMessageSender()}, to send * messages in batch manner, and commit or roll back at the appropriate time. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface BatchMessageSender { /** * Submits a message to this sender * * @param message a message to be sent * @return this batch sender */ BatchMessageSender send(Message message); /** * Commits all the uncommitted messages in this sender. * * @throws OMSRuntimeException if the sender fails to commit the messages due to some internal error. */ void commit(); /** * Discards all the uncommitted messages in this sender. */ void rollback(); /** * Close this sender. */ void close();}
- 定义了send方法用于批量添加消息,这里的send语义不是太好,用pending好一些
- 然后定义了commit用于提交批量消息、rollback用于回滚、close用于关闭这个sender
consumer
PullConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java
/** * A {@code PullConsumer} pulls messages from the specified queue, * and supports submit the consume result by acknowledgement. * * @version OMS 1.0.0 * @see MessagingAccessPoint#createPullConsumer() * @since OMS 1.0.0 */public interface PullConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code PullConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}. ** There are some standard attributes defined by OMS for {@code PullConsumer}: *
- *
- {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. *
- {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}. *
* After the success call, this consumer won't receive new message * from the specified queue any more. * * @param queueName a specified queue * @return this {@code PullConsumer} instance */ PullConsumer detachQueue(String queueName); /** * Receives the next message from the attached queues of this consumer. *
* This call blocks indefinitely until a message is arrives, the timeout expires, * or until this {@code PullConsumer} is shut down. * * @return the next message received from the attached queues, or null if the consumer is * concurrently shut down or the timeout expires * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. */ Message receive(); /** * Receives the next message from the attached queues of this consumer, using the specified attributes. *
* This call blocks indefinitely until a message is arrives, the timeout expires, * or until this {@code PullConsumer} is shut down. * * @param attributes the specified attributes * @return the next message received from the attached queues, or null if the consumer is * concurrently shut down or the timeout expires * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. */ Message receive(KeyValue attributes); /** * Acknowledges the specified and consumed message with the unique message receipt handle. *
* Messages that have been received but not acknowledged may be redelivered. * * @param receiptHandle the receipt handle associated with the consumed message * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(String receiptHandle); /** * Acknowledges the specified and consumed message with the specified attributes. *
* Messages that have been received but not acknowledged may be redelivered. * * @param receiptHandle the receipt handle associated with the consumed message * @param attributes the specified attributes * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(String receiptHandle, KeyValue attributes);}
- 定义了attachQueue以及attachQueue方法,用于绑定及解绑队列
- receive方法用于接收消息,其实这里用receive语义不是太好,用pull可能好点
- ack方法用于ack消息
PushConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java
/** * A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from * MOM server to {@code PushConsumer} client. * * @version OMS 1.0.0 * @see MessagingAccessPoint#createPushConsumer() * @since OMS 1.0.0 */public interface PushConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code PushConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}. ** There are some standard attributes defined by OMS for {@code PushConsumer}: *
- *
- {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. *
- {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}. *
* This method resumes the {@code PushConsumer} instance after it was suspended. * The instance will not receive new messages between the suspend and resume calls. * * @throws OMSRuntimeException if the instance has not been suspended. * @see PushConsumer#suspend() */ void resume(); /** * Suspends the {@code PushConsumer} for later resumption. *
* This method suspends the consumer until it is resumed. * The consumer will not receive new messages between the suspend and resume calls. *
* This method behaves exactly as if it simply performs the call {@code suspend(0)}. * * @throws OMSRuntimeException if the instance is not currently running. * @see PushConsumer#resume() */ void suspend(); /** * Suspends the {@code PushConsumer} for later resumption. *
* This method suspends the consumer until it is resumed or a * specified amount of time has elapsed. * The consumer will not receive new messages during the suspended state. *
* This method is similar to the {@link #suspend()} method, but it allows finer control * over the amount of time to suspend, and the consumer will be suspended until it is resumed * if the timeout is zero. * * @param timeout the maximum time to suspend in milliseconds. * @throws OMSRuntimeException if the instance is not currently running. */ void suspend(long timeout); /** * This method is used to find out whether the {@code PushConsumer} is suspended. * * @return true if this {@code PushConsumer} is suspended, false otherwise */ boolean isSuspended(); /** * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}. *
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new * delivered message is coming. * * @param queueName a specified queue * @param listener a specified listener to receive new message * @return this {@code PushConsumer} instance */ PushConsumer attachQueue(String queueName, MessageListener listener); /** * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener} and some * specified attributes. *
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new * delivered message is coming. * * @param queueName a specified queue * @param listener a specified listener to receive new message * @param attributes some specified attributes * @return this {@code PushConsumer} instance */ PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes); /** * Detaches the {@code PushConsumer} from a specified queue. *
* After the success call, this consumer won't receive new message * from the specified queue any more. * * @param queueName a specified queue * @return this {@code PushConsumer} instance */ PushConsumer detachQueue(String queueName); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. * * @param interceptor an interceptor instance */ void addInterceptor(ConsumerInterceptor interceptor); /** * Removes an interceptor from this consumer. * * @param interceptor an interceptor to be removed */ void removeInterceptor(ConsumerInterceptor interceptor);}
- 定义了attachQueue以及detachQueue方法用于绑定及解绑队列
- attachQueue方法有个MessageListener用于消息push的时候的回调处理
MessageListener
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java
/** * A message listener must implement this {@code MessageListener} interface and register * itself to a consumer instance to asynchronously receive messages. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface MessageListener { /** * Callback method to receive incoming messages. ** A message listener should handle different types of {@code Message}. * * @param message the received message object * @param context the context delivered to the consume thread */ void onReceived(Message message, Context context); interface Context { /** * Returns the attributes of this {@code MessageContext} instance. * * @return the attributes */ KeyValue attributes(); /** * Acknowledges the specified and consumed message, which is related to this {@code MessageContext}. *
* Messages that have been received but not acknowledged may be redelivered. * * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(); }}
- MessageListener定义了onReceived的回调方法,同时传递Context上下文对象
StreamingConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingConsumer.java
/** * A {@code StreamingConsumer} provides low level APIs to open multiple streams * from a specified queue and then retrieve messages from them through @{code StreamingIterator}. * * A {@code Queue} is consists of multiple streams, the {@code Stream} is an abstract concept and * can be associated with partition in most messaging systems. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface StreamingConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code StreamingConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code StreamingConsumer}. ** There are some standard attributes defined by OMS for {@code StreamingConsumer}: *
- *
- {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. *
- {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code * StreamingConsumer}. *
* Creates a {@code StreamingIterator} from the begin position if the given position * is earlier than the first message's store position in this stream. *
* Creates a {@code StreamingIterator} from the end position, if the given position * is later than the last message's store position in this stream. *
* The position is a {@code String} value, may represented by timestamp, offset, cursor, * even a casual key. * * @param streamName the specified stream * @param position the specified position * @return a message iterator at the specified position */ StreamingIterator seek(String streamName, String position);}
- 流式处理主要是定义seek方法,返回的是StreamingIterator
StreamingIterator
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingIterator.java
/** * A {@code StreamingIterator} is provided by {@code Stream} and is used to * retrieve messages a specified stream like a read-only iterator. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface StreamingIterator { /** * Returns the attributes of this {@code StreamingIterator} instance. ** There are some standard attributes defined by OMS for {@code Stream}: *
- *
- {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code * Stream}. *
* This method may be called repeatedly to iterate through the iteration, * or intermixed with calls to {@link #previous} to go back and forth. * * @return the next message in the list * @throws OMSRuntimeException if the iteration has no more message, or * the the consumer fails to receive the next message */ Message next(); /** * Returns {@code true} if this partition iterator has more messages when * traversing the iterator in the reverse direction. * * @return {@code true} if the partition iterator has more messages when * traversing the iterator in the reverse direction */ boolean hasPrevious(); /** * Returns the previous message in the iteration and moves the offset * position backwards. *
* This method may be called repeatedly to iterate through the iteration backwards, * or intermixed with calls to {@link #next} to go back and forth. * * @return the previous message in the list * @throws OMSRuntimeException if the iteration has no previous message, or * the the consumer fails to receive the previous message */ Message previous(); /** * Returns the position of the message that would be returned by a * subsequent call to {@link #next}. * * @return the position of the next message * @throws OMSRuntimeException if the iteration has no next message */ String nextPosition(); /** * Returns the position of the message that would be returned by a * subsequent call to {@link #previous}. * * @return the position of the previous message * @throws OMSRuntimeException if the iteration has no previous message */ String previousPosition();}
- 是一个read-only的iterator,类似java的iterator,提供了hasNext、next等方法
小结
openmessaging-java的定义java实现OpenMessaging的api规范,其中producer提供了单个发送也提供了批量发送的方法,而consumer则提供了pull、push以及stream三类消费方式。