Fetching the current value of a topic
A client can send a fetch request for the state of a topic. If the topic is of a type that maintains its state, the Diffusion™ server provides the current state of that topic to the client.
Required permissions:
and permissions for the specified topicsA fetch request enables you to get the current value of a topic without subscribing to the topic.
The value fetched from a topic are received through a fetch stream, not through any value streams or topic streams registered against the topic.
Fetch streams are not typed and are used to receive responses to fetch requests for all topics that match the topic selectors used when registering the fetch stream.
Fetch the value of a topic or set of topics by using a topic selector to make a fetch request:
session.fetch("topic_selector").on('value', function(value, path) { // Do something });
[session.topics fetchWithTopicSelectorExpression:@"topic_selector" delegate:self];
Topics topics = session.feature(Topics.class); topics.fetch(topic_selector, new FetchStream.Default);
var topics = session.Topics; topics.Fetch( topic_selector, new FetchStreamDefault );
FETCH_PARAMS_T params = { .selector = topic_selector, .on_topic_message = on_topic_message, .on_fetch = on_fetch, .on_status_message = on_fetch_status_message }; /* * Issue the fetch request. */ fetch(session, params);
These examples pass in a default or no-op fetch stream.
To make use of the fetched values, implement a fetch stream to handle the values:
session.fetch("topic_selector").on({ value : function(value, path) { console.log("Value for topic '" + path + "' is: " + value); }, error : function(error) { console.log("Error on fetch: '" + error); }, close : function() { console.log("Fetch stream closed."); } });
@implementation FetchExample (PTDiffusionFetchStreamDelegate) -(void)diffusionStream:(PTDiffusionStream * const)stream didFetchTopicPath:(NSString * const)topicPath content:(PTDiffusionContent * const)content { NSLog(@"Fetch Result: %@ = \"%@\"", topicPath, content); } -(void)diffusionDidCloseStream:(PTDiffusionStream * const)stream { NSLog(@"Fetch stream finished."); } -(void)diffusionStream:(PTDiffusionStream * const)stream didFailWithError:(NSError * const)error { NSLog(@"Fetch stream failed error: %@", error); } @end
private final class myFetchStream implements Topics.FetchStream { @Override public void onClose() { LOG.info("Fetch stream closed."); } @Override public void onDiscard() { LOG.info("Fetch stream discarded."); } @Override public void onFetchReply(String topicPath, Content content) { LOG.info("Fetched value " + content.asString() +" from topic " + topicPath); } }
internal sealed class myFetchStream : IFetchStream { public void OnClose() { Console.WriteLine( "The fetch stream is now closed." ); } public void OnDiscard() { Console.WriteLine( "The fetch stream was discarded." ); } public void OnFetchReply( string topicPath, IContent content ) { Console.WriteLine( "Fetched value {0} from topic {1}", topicPath, content.AsString() ); } }
/* * When a fetched message is received, this callback in invoked. */ static int on_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg) { printf("Received message for topic %s\n", msg->name); printf("Payload: %.*s\n", (int)msg->payload->len, msg->payload->data); #ifdef DEBUG topic_message_debug(response->payload); #endif return HANDLER_SUCCESS; }
This page last modified: 2015/02/02