Configure streaming of events

Several APIs provide streaming of events. All of them use events.proto for configuration.

Keep-Alive

The load balancer hosting api.wgtwo.com will silently drop connections if they have been idle for 350 seconds (5m 50s).

That is, if the stream has low traffic you may experience that the stream is down without that being visible for your client.

Our event services is set up to allow keep-alive requests every minute. We strongly recommend that you configure a keep-alive period of most five minutes as that will ensure your connection is not silently dropped.

SettingRecommended value
keep-alive period1 minute (Must be ≥ 1 - Recommended at most 5 minutes)
keep-alive timeout10 seconds
permit keep-alive without callstrue

Java example

var managedChannel = ManagedChannelBuilder.forTarget("api.wgtwo.com:443")
        .keepAliveWithoutCalls(true)
        .keepAliveTime(1, TimeUnit.MINUTES)
        .keepAliveTimeout(10, TimeUnit.SECONDS)
        .idleTimeout(1, TimeUnit.HOURS)
        .build()
1
2
3
4
5
6

Max connection age

Max connection age for our servers is, in general, set to 60 minutes with a random jitter of ± 6 minutes. After this the connection will be gracefully terminated.

General advice

Your client should be able to handle recovering from random disconnects. That is, the client should simply reconnect on errors.

In the case of authentication errors, you should wait for some period to avoid being rate limited.

Configure for development

Using a RegularStream instead of the default value will result in the following:

  • reading position will not be stored in the server
  • load is not spread between your clients

WARNING

This will make you lose events when you restart your application, and if you have multiple connections using the same OAuth 2.0 client ID, they will all receive all events (load is not spread between the clients).

These settings are not recommended for production use.

// All connected clients will see all events in the stream.
// Reading position is not stored at the server, so disconnecting will make it start fresh.
message RegularStream {}
// Regular:
//   Warning: This is intended for testing purposes only and is not recommended for production.
val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
    .setStreamConfiguration(
        EventsProto.StreamConfiguration.newBuilder()
            .setRegular(EventsProto.RegularStream.getDefaultInstance())
    )
    .build()
context.run {
    subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}
1
2
3
4
5
6
7
8
9

Configure for gRPCurl

Writing application code you would normally acknowledge events as you receive them. This is not something you can easily do when streaming with gRPCurlopen in new window. To fix this you must disable explicit acknowledgement.

Example usage

grpcurl -protoset wgtwo.bin \
  -H 'Authorization: Bearer UG5dO...'
  -d '
  {
    "stream_configuration": {
      "regular": {},
      "disable_explicit_ack": {}
    }
  }
  ' \
  api.wgtwo.com:443 \
  wgtwo.subscription.v1.SubscriptionEventService/StreamHandsetChangeEvents


 
 
 
 
 
 
 
 


1
2
3
4
5
6
7
8
9
10
11
12

This example also uses RegularStream.

  // Optional: By default, the client is required to send a ack message and will use a timeout of 30 seconds.
  oneof acknowledge_option {
    // Disable ack
    google.protobuf.Empty disable_explicit_ack = 3;
    // Must be between 10 seconds and 10 minutes
    google.protobuf.Duration custom_ack_timeout = 4;
  }



 



Configure for production

The default configuration is production ready. This configuration remembers queue position, so you don't lose any events, and spreads load across all servers using the same OAuth 2.0 client ID.

message StreamConfiguration {
  // Optional: Will use DurableQueue by default
  oneof stream_type {
    RegularStream regular = 1;
    DurableQueue durable_queue = 2;
  }

Manually acknowledge events

You also have to explicitly ack(nowledge) each event to let the streaming API know that you have indeed received the message, and it should not be sent again.

// Ack request, which is required for sending a ack of an event
message AckRequest{
  AckInfo ack_info = 1;
}
// This contains a opaque string which should be included in the ack request to identify the event
message AckInfo {
  string value = 1;
}
private val accessToken = "my_client_access_token"
private val channel = ManagedChannelBuilder.forAddress("api.wgtwo.com", 443).build()
private val subscriptionEvents: SubscriptionEventServiceGrpc.SubscriptionEventServiceStub =
    SubscriptionEventServiceGrpc.newStub(channel)
        .withCallCredentials(AccessToken(accessToken))
        .withWaitForReady()

// ...

private fun ack(event: StreamHandsetChangeEventsResponse) {
    val request = AckHandsetChangeEventRequest.newBuilder().setAckInfo(event.metadata.ackInfo).build()
    subscriptionEvents.ackHandsetChangeEvent(request, AckObserver)
}









 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13

Tweak max number of in-flight events

You can also tweak the default max_in_flight.

  // Optional: By default, max 50 unacknowledged events may be in-flight
  // Must be between 1 and 200
  uint32 max_in_flight = 5;
val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
    .setStreamConfiguration(
        EventsProto.StreamConfiguration.newBuilder()
            .setMaxInFlight(10)
    )
    .build()
context.run {
    subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}



 





1
2
3
4
5
6
7
8
9

Full streaming example

See Get handset change events.