Save the Code as sample-publish.go # Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Go sample for connecting to Omnicore via MQTT, using JWT.
This example connects to Omnicore via MQTT, using a JWT for device
authentication. After connecting, by default the device publishes 100 messages
to the device's MQTT topic at a rate of one per second, and then exits.
Before you run the sample, you must follow the instructions in the README
for this sample.
"""
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/golang-jwt/jwt"
)
var (
deviceID = flag.String("device", "", "Omnicore Device ID")
bridge = struct {
host *string
port *string
}{
flag.String("mqtt_host", "lowercased(subscriptionid).mqtt.korewireless.com", "MQTT Bridge Host"),
flag.String("mqtt_port", "8883", "MQTT Bridge Port"),
}
subscriptionID = flag.String("subscription", "", "Omnicore Subscription ID")
registryID = flag.String("registry", "", "Omnicore Registry ID (short form)")
certsCA = flag.String("ca_certs", "", "Download https://pki.cloud.korewireless.com/roots.pem")
privateKey = flag.String("private_key", "", "Path to private key file")
)
func main() {
log.Println("[main] Entered")
log.Println("[main] Flags")
flag.Parse()
log.Println("[main] Loading Omnicore's roots")
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(*certsCA)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
log.Println("[main] Creating TLS Config")
config := &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{},
MinVersion: tls.VersionTLS12,
}
clientID := fmt.Sprintf("subscriptions/%v/registries/%v/devices/%v",
*subscriptionID,
*registryID,
*deviceID,
)
log.Println("[main] Creating MQTT Client Options")
opts := MQTT.NewClientOptions()
broker := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
log.Printf("[main] Broker '%v'", broker)
opts.AddBroker(broker)
opts.SetClientID(clientID).SetTLSConfig(config)
opts.SetUsername("unused")
token := jwt.New(jwt.SigningMethodRS256)
token.Claims = jwt.StandardClaims{
IssuedAt: time.Now().Unix(),
ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
}
log.Println("[main] Load Private Key")
keyBytes, err := ioutil.ReadFile(*privateKey)
if err != nil {
log.Fatal(err)
}
log.Println("[main] Parse Private Key")
key, err := jwt.ParseRSAPrivateKeyFromPEM(keyBytes)
if err != nil {
log.Fatal(err)
}
log.Println("[main] Sign String")
tokenString, err := token.SignedString(key)
if err != nil {
log.Fatal(err)
}
opts.SetPassword(tokenString)
// Incoming
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("[handler] Topic: %v\n", msg.Topic())
fmt.Printf("[handler] Payload: %v\n", msg.Payload())
})
log.Println("[main] MQTT Client Connecting")
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
topic := struct {
config string
telemetry string
loopback string
}{
config: fmt.Sprintf("/%s/%s/config", *registryID,*deviceID),
telemetry: fmt.Sprintf("/%s/%s/events", *registryID,*deviceID),
loopback: fmt.Sprintf("/%s/%s/loopback", *registryID,*deviceID),
}
log.Println("[main] Publishing Messages To Events Telemetry Topic")
for i := 0; i < 10; i++ {
log.Printf("[main] Publishing Message #%d", i)
token := client.Publish(
topic.telemetry,
0,
false,
fmt.Sprintf("Message: %d", i))
token.WaitTimeout(5 * time.Second)
}
log.Println("[main] MQTT Client Disconnecting")
client.Disconnect(250)
log.Println("[main] Done")
}