使用go来消费kafka信息,然后写入到mongo。

kafka

kafka消费者

package kafka

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"godemo/hiwangqi.com/study/other/cron/util/mongo"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"
)

var (
	wg sync.WaitGroup
)

// 日志对象
type Syslog struct {
	ID int64 `json:"log_id"`
	LogValue string	`json:"content"`
	LogTime string	`json:"log_time"`
}

var syslog *Syslog

func Customer() {
	consumer, err := sarama.NewConsumer([]string{"192.168.3.66:9092"}, nil)

	if err != nil {
		panic(err)
	}

	partitionList, err := consumer.Partitions("hiwangqi")
	if err != nil {
		panic(err)
	}

	// 遍历分区
	for _, p := range partitionList {
		//创建了相应的分区消费者
		//如果该分区消费者已经消费了该信息将会返回error
		//sarama.OffsetNewest:表明了为最新消息
		pc, err := consumer.ConsumePartition("hiwangqi", p, sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}

		defer pc.AsyncClose()

		wg.Add(1)
		go func(pc sarama.PartitionConsumer) {
			defer wg.Done()
			for msg := range pc.Messages() {

				// 处理kafka数据
				err := json.Unmarshal([]byte(msg.Value),&syslog)
				if err != nil {
					fmt.Println(err)
					return
				}

				// 写入Mongodb
				mgo := mongo.NewMgo("pay","paylog")
				cid := mgo.InsertOne(&syslog)
				fmt.Println(cid)

				//fmt.Fprintf(os.Stdout,"%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)

	}
	wg.Wait()
}

mongodb

package mongo

import (
	"context"
	"fmt"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"time"
)

type Database struct {
	Mongo  * mongo.Client
}


var DB *Database


//初始化
func Init() {
	DB = &Database{
		Mongo: SetConnect(),
	}
}
// 连接设置
func SetConnect() *mongo.Client{
	uri := "mongodb://192.168.3.66:27017"
	ctx ,cancel := context.WithTimeout(context.Background(),10*time.Second)
	defer cancel()
	client, err := mongo.Connect(ctx,options.Client().ApplyURI(uri).SetMaxPoolSize(20)) // 连接池
	if err !=nil{
		fmt.Println(err)
	}
	return client
}

mgo.go mongo增删改查

package mongo

import (
	"fmt"
	"context"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"strconv"
	"time"
)



type mgo struct {
	database   string
	collection string
}

func NewMgo(database, collection string) *mgo {

	return &mgo{
		database,
		collection,
	}
}

// 查询单个
func (m *mgo) FindOne(key string, value interface{}) *mongo.SingleResult {
	client := DB.Mongo
	collection, _ := client.Database(m.database).Collection(m.collection).Clone()
	//collection.
	filter := bson.D{{key, value}}
	singleResult := collection.FindOne(context.TODO(), filter)
	return singleResult
}

//插入单个
func (m *mgo) InsertOne(value interface{}) string {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	insertResult, err := collection.InsertOne(context.TODO(), value)
	if err != nil {
		fmt.Println(err)
	}
	return insertResult.InsertedID.(primitive.ObjectID).Hex()
}

//查询集合里有多少数据
func (m *mgo) CollectionCount() (string, int64) {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	name := collection.Name()
	size, _ := collection.EstimatedDocumentCount(context.TODO())
	return name, size
}

//按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
func (m *mgo) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	SORT := bson.D{{"_id", sort}} //filter := bson.D{{key,value}}
	filter := bson.D{{}}
	findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
	//findOptions.SetLimit(i)
	temp, _ := collection.Find(context.Background(), filter, findOptions)
	return temp
}

//获取集合创建时间和编号
func (m *mgo) ParsingId(result string) (time.Time, uint64) {
	temp1 := result[:8]
	timestamp, _ := strconv.ParseInt(temp1, 16, 64)
	dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
	temp2 := result[18:]
	count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
	return dateTime, count
}

//删除文章和查询文章
func (m *mgo) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	filter := bson.D{{key, value}}
	singleResult := collection.FindOne(context.TODO(), filter)
	DeleteResult, err := collection.DeleteOne(context.TODO(), filter, nil)
	if err != nil {
		fmt.Println("删除时出现错误,你删不掉的~")
	}
	return DeleteResult.DeletedCount, singleResult
}

//删除文章
func (m *mgo) Delete(key string, value interface{}) int64 {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	filter := bson.D{{key, value}}
	count, err := collection.DeleteOne(context.TODO(), filter, nil)
	if err != nil {
		fmt.Println(err)
	}
	return count.DeletedCount

}

//删除多个
func (m *mgo) DeleteMany(key string, value interface{}) int64 {
	client := DB.Mongo
	collection := client.Database(m.database).Collection(m.collection)
	filter := bson.D{{key, value}}

	count, err := collection.DeleteMany(context.TODO(), filter)
	if err != nil {
		fmt.Println(err)
	}
	return count.DeletedCount
}

main.go

package main

import (
	"github.com/json-iterator/go"
	"godemo/hiwangqi.com/study/other/cron/util/kafka"
	"godemo/hiwangqi.com/study/other/cron/util/mongo"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func main() {
	mongo.Init()
	kafka.Customer()
}

编译可执行文件

mac下编译Linux可执行文件

GOOS=linux GOARCH=amd64 go build main.go

windows和mac下编译Linux可执行文件

# mac
SET CGO_ENABLED=0
SET GOOS=darwin
SET GOARCH=amd64
go build main.go
# linux
SET CGO_ENABLED=0
SET GOOS=linux
SET GOARCH=amd64
go build main.go

守护进程配置

基于supervisor实现守护进程。

给go服务应用创建一个配置文件

# /opt/conf.d 配置文件目录
[root@node1 conf.d]# vim syslog.ini
[program:syslog]
command=/services/go_services/sysmessage/main
stderr_logfile=/services/go_services/sysmessage/err.log
stdout_logfile=/services/go_services/sysmessage/out.log
user=root
stopsignal=INT

启动supervisor

[root@node1 conf.d]# supervisord -c /etc/supervisord.conf
[root@node1 conf.d]# ps -ef |grep main
root      46431  46428  0 14:40 ?        00:00:00 /services/go_services/sysmessage/main
root      46458  38513  0 14:41 pts/0    00:00:00 grep --color=auto main