[attach]570[/attach]
一.搭建网络
model.py,同tensorflow官网mnist的例子差不多,不懂的可以去官网看下官网代码解析
- import tensorflow as tf
- #这里输入采用28x28方便之后进行rknn量化
- x = tf.placeholder("float", [None, 28,28],name='x')
- y_ = tf.placeholder("float", [None,10],name='y_')
- keep_prob = tf.placeholder("float", name='keep_prob')
- def weight_variable(shape,name):
- initial = tf.truncated_normal(shape, stddev=0.1)
- return tf.Variable(initial,name=name)
- def bias_variable(shape,name):
- initial = tf.constant(0.1, shape=shape)
- return tf.Variable(initial,name=name)
- def conv2d(x, W):
- return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
- def max_pool_2x2(x):
- return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
- strides=[1, 2, 2, 1], padding='SAME')
- # convolution layer
- def lenet5_layer(input, weight, bias,weight_name,bias_name):
- W_conv = weight_variable(weight,weight_name)
- b_conv = bias_variable(bias,bias_name)
- h_conv = tf.nn.relu(conv2d(input, W_conv) + b_conv)
- return max_pool_2x2(h_conv)
- # connected layer
- def dense_layer(layer, weight, bias,weight_name,bias_name):
- W_fc = weight_variable(weight,weight_name)
- b_fc = bias_variable(bias,bias_name)
- return tf.nn.relu(tf.matmul(layer, W_fc) + b_fc)
- def build_model(is_training):
- #first conv
- x_image = tf.reshape(x, [-1,28,28,1])
- W_conv1 = [5, 5, 1, 32]
- b_conv1 = [32]
- layer1 = lenet5_layer(x_image,W_conv1,b_conv1,'W_conv1','b_conv1')
- #second conv
- W_conv2 = [5, 5, 32, 64]
- b_conv2 = [64]
- layer2 = lenet5_layer(layer1,W_conv2,b_conv2,'W_conv2','b_conv2')
- #third conv
- W_fc1 = [7 * 7 * 64, 1024]
- b_fc1 = [1024]
- layer2_flat = tf.reshape(layer2, [-1, 7*7*64])
- layer3 = dense_layer(layer2_flat,W_fc1,b_fc1,'W_fc1','b_fc1')
- #softmax
- W_fc2 = weight_variable([1024, 10],'W_fc2')
- b_fc2 = bias_variable([10],'b_fc2')
- if is_training:
- #dropout
- h_fc1_drop = tf.nn.dropout(layer3, keep_prob)
- finaloutput=tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2,name="y_conv")
- else:
- finaloutput=tf.nn.softmax(tf.matmul(layer3, W_fc2) + b_fc2,name="y_conv")
- print('finaloutput:', finaloutput)
- return finaloutput
二.训练网络
train.py,这里代码兼容了tf伪量化的代码,这里我们把create_training_graph()传入的参数is_quantify设为False就可以了,由于mnist拿到的train和test数据shape都是(784,),这里定义了一个reshape_batch函数把train时的batch以及test时的输入都reshape成(28,28),具体代码如下:
- # -*- coding=utf-8 -*-
- import tensorflow as tf
- from tensorflow.examples.tutorials.mnist import input_data
- from model import build_model, x, y_, keep_prob
- mnist = input_data.read_data_sets("../MNIST_data/", one_hot=True)
- def create_training_graph(is_quantify):
- #创建训练图,加入create_training_graph:
- g = tf.get_default_graph() # 给create_training_graph的参数,默认图
- #调用网络定义,也就是拿到输出
- y_conv = build_model(is_training=True) #这里的is_training设置为True,因为前面模型定义写了训练时要用到dropout
- #损失函数
- cross_entropy = -tf.reduce_sum(y_*tf.log(y_conv))
- print('cost:', cross_entropy)
- if is_quantify:
- # 加入 create_training_graph函数,注意位置要在loss之后, optimize之前
- tf.contrib.quantize.create_training_graph(input_graph=g, quant_delay=0)
- # optimize
- optimize = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
- #计算准确率
- correct_prediction = tf.equal(tf.argmax(y_conv,1), tf.argmax(y_,1))
- # 给出识别准确率[这会返回我们一个布尔值的列表.为了确定哪些部分是正确的,我们要把它转换成浮点值,然后再示均值。 比如, [True, False, True, True] 会转换成 [1,0,1,1] ,从而它的准确率就是0.75.]
- accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
- # 返回所需数据,供训练使用
- return dict(
- x=x,
- y=y_,
- keep_prob=keep_prob,
- optimize=optimize,
- cost=cross_entropy,
- correct_prediction=correct_prediction,
- accuracy=accuracy,
- )
- def reshape_batch(batch):
- rebatch = []
- for item in batch:
- b = item.reshape(28,28)
- rebatch.append(b)
- return rebatch
- #开始训练
- def train_network(graph,ckpt,point_dir,pbtxt):
- # 初始化
- init = tf.global_variables_initializer()
- # 调用Saver函数保存所需文件
- saver = tf.train.Saver()
- # 创建上下文,开始训练sess.run(init)
- with tf.Session() as sess:
- sess.run(init)
- # 一共训练两万次,准确率达到百分99以上
- for i in range(20000):
- # 每次处理50张图片
- batch = mnist.train.next_batch(50)
- # 每100次保存并打印一次准确率等
- if i % 100 == 0:
- # feed_dict喂数据,数据全reshape成28x28
- train_accuracy = sess.run([graph['accuracy']], feed_dict={
- graph['x']:reshape_batch(batch[0]), # batch[0]存的图片数据
- graph['y']:batch[1], # batch[1]存的标签
- graph['keep_prob']: 1.0})
- print("step %d, training accuracy %g"%(i, train_accuracy[0]))
- sess.run([graph['optimize']], feed_dict={
- graph['x']:reshape_batch(batch[0]),
- graph['y']:batch[1],
- graph['keep_prob']:0.5})
- test_accuracy = sess.run([graph['accuracy']], feed_dict={
- graph['x']: reshape_batch(mnist.test.images),
- graph['y']: mnist.test.labels,
- graph['keep_prob']: 1.0})
- print("Test accuracy %g" % test_accuracy[0])
- # 保存ckpt(checkpoint)和pbtxt。记得把路径改成自己的路径
- saver.save(sess, ckpt)
- tf.train.write_graph(sess.graph_def,point_dir,pbtxt, True)
- print(tf.trainable_variables())
- print(tf.get_variable('W_fc2',[1024, 10]).value)
- if __name__ == "__main__":
- ckpt = './checkpoint/mnist.ckpt'
- point_dir = './checkpoint'
- pbtxt = 'mnist.pbtxt'
- g1 = create_training_graph(False)
- train_network(g1,ckpt,point_dir,pbtxt)
三.保存网络参数
freese.py,将网络中参数和变量从ckpt中读出来,保存为pb文件,同上一步一样,将frozen函数的is_quantify设为False就可以了:
- import tensorflow as tf
- import os.path
- from model import build_model
- from tensorflow.python.framework import graph_util
- # 创建推理图
- def create_inference_graph():
- """Build the mnist model for evaluation."""
- # 调用网络,Create an output to use for inference.
- logits = build_model(is_training=False)
- return logits
- # # 得到分类输出
- # tf.nn.softmax(logits, name='output')
- def load_variables_from_checkpoint(sess, start_checkpoint):
- """Utility function to centralize checkpoint restoration.
- Args:
- sess: TensorFlow session.
- start_checkpoint: Path to saved checkpoint on disk.
- """
- saver = tf.train.Saver(tf.global_variables())
- saver.restore(sess, start_checkpoint)
- def frozen(is_quantify,ckpt,pbtxt):
- # Create the model and load its weights.
- init = tf.global_variables_initializer()
- with tf.Session() as sess:
- sess.run(init)
- # 推理图
- logits = create_inference_graph()
- # 加入create_eval_graph(),转化为tflite可接受的格式。以下语句中有路径的,记得改路径。
- if is_quantify:
- tf.contrib.quantize.create_eval_graph()
- load_variables_from_checkpoint(sess, ckpt)
- # Turn all the variables into inline constants inside the graph and save it.
- # 固化 frozen:ckpt + pbtxt
- frozen_graph_def = graph_util.convert_variables_to_constants(
- sess, sess.graph_def, ['y_conv'])
- # 保存最终的pb模型
- tf.train.write_graph(
- frozen_graph_def,
- os.path.dirname(pbtxt),
- os.path.basename(pbtxt),
- as_text=False)
- tf.logging.info('Saved frozen graph to %s', pbtxt)
- if __name__ == "__main__":
- ckpt = './checkpoint/mnist.ckpt'
- pbtxt = 'mnist_frozen_graph.pb'
- frozen(False,ckpt,pbtxt)
- #is_quantify False mnist_frozen_graph_not_28x28.pb
- # ckpt = './checkpoint_not/mnist.ckpt'
- # pbtxt = 'mnist_frozen_graph_not.pb'
- # frozen(False,ckpt,pbtxt)
- # ckpt = './test/mnist.ckpt'
- # pbtxt = 'test.pb'
- # frozen(False,ckpt,pbtxt)
四.将pb模型转为rknn
由于量化rknn模型需要相应图片集,因此我们先要获取相应的数据集进入mnist数据目录下,解压t10k-images-idx3-ubyte.gz,然后运行get_image.py,将原先压缩的数据转为图片,同时得到量化需要的dataset.txt文件。
get_image.py:
- import struct
- import numpy as np
- #import matplotlib.pyplot as plt
- import PIL.Image
- from PIL import Image
- import os
- os.system("mkdir ../MNIST_data/mnist_test")
- filename='../MNIST_data/t10k-images.idx3-ubyte'
- dataset = './dataset.txt'
- binfile=open(filename,'rb')
- buf=binfile.read()
- index=0
- data_list = []
- magic,numImages,numRows,numColumns=struct.unpack_from('>IIII',buf,index)
- index+=struct.calcsize('>IIII')
- for image in range(0,numImages):
- im=struct.unpack_from('>784B',buf,index)
- index+=struct.calcsize('>784B')
- im=np.array(im,dtype='uint8')
- im=im.reshape(28,28)
- im=Image.fromarray(im)
- im.save('../MNIST_data/mnist_test/test_%s.jpg'%image,'jpeg')
- data_list.append('../MNIST_data/mnist_test/test_%s.jpg\n'%image)
- with open(dataset,'w+') as ff:
- ff.writelines(data_list)
rknn_transfer.py:
- from rknn.api import RKNN
- def common_transfer(pb_name,export_name):
- ret = 0
- #看具体log 传入verbose=True
- rknn = RKNN()
- #灰度图无需此步操作
- # rknn.config(channel_mean_value='', reorder_channel='')
- print('--> Loading model')
- ret = rknn.load_tensorflow(
- tf_pb='./mnist_frozen_graph.pb',
- inputs=['x'],
- outputs=['y_conv'],
- input_size_list=[[28,28,1]])
- if ret != 0:
- print('load_tensorflow error')
- rknn.release()
- return ret
- print('done')
- print('--> Building model')
- rknn.build(do_quantization=False)
- print('done')
- # 导出保存rknn模型文件
- rknn.export_rknn('./mnist.rknn')
- # Release RKNN Context
- rknn.release()
- return ret
- def quantify_transfer(pb_name,dataset_name,export_name):
- ret = 0
- print(pb_name,dataset_name,export_name)
- rknn = RKNN()
- rknn.config(channel_mean_value='', reorder_channel='',quantized_dtype='dynamic_fixed_point-8')
- print('--> Loading model')
- ret = rknn.load_tensorflow(
- tf_pb=pb_name,
- inputs=['x'],
- outputs=['y_conv'],
- input_size_list=[[28,28,1]])
- if ret != 0:
- print('load_tensorflow error')
- rknn.release()
- return ret
- print('done')
- print('--> Building model')
- rknn.build(do_quantization=True,dataset=dataset_name)
- print('done')
- # 导出保存rknn模型文件
- rknn.export_rknn(export_name)
- # Release RKNN Context
- rknn.release()
- return ret
- if __name__ == '__main__':
- #pb转化为rknn模型
- pb_name = './mnist_frozen_graph.pb'
- export_name = './mnist.rknn'
- ret = common_transfer(pb_name,export_name)
- if ret != 0:
- print('======common transfer error !!===========')
- else:
- print('======common transfer ok !!===========')
- dataset_name = './dataset.txt'
- export_name = './mnist_quantization.rknn'
- #pb转化为量化的rknn模型
- quantify_transfer(pb_name,dataset_name,export_name)
- if ret != 0:
- print('======quantization transfer 10000 error !!===========')
- else:
- print('======quantization transfer 10000 ok !!===========')
五.对比pb和rknn的推理结果,比较他们的准确度
分别运行tf_predict.py,rknn_predict.py得到tf模型,rknn模型,量化的rknn模型的运行结果:
tf_predict.py
- #! -*- coding: utf-8 -*-
- from __future__ import absolute_import, unicode_literals
- from tensorflow.examples.tutorials.mnist import input_data
- import tensorflow as tf
-
- mnist = input_data.read_data_sets("../MNIST_data/", one_hot=True)
- origin_test = mnist.test.images
- reshape_test = []
- for t in origin_test:
- b = t.reshape(28,28)
- reshape_test.append(b)
- for length in [100,500,1000,10000]:
- with tf.Graph().as_default():
- output_graph_def = tf.GraphDef()
- output_graph_path = './mnist_frozen_graph.pb'
- with open(output_graph_path, 'rb') as f:
- output_graph_def.ParseFromString(f.read())
- _ = tf.import_graph_def(output_graph_def, name="")
-
- with tf.Session() as sess:
- sess.run(tf.global_variables_initializer())
- input = sess.graph.get_tensor_by_name("x:0")
- output = sess.graph.get_tensor_by_name("y_conv:0")
- y_conv_2 = sess.run(output, feed_dict={input:reshape_test[0:length]})
- y_2 = mnist.test.labels[0:length]
- print("first image:",y_conv_2[0])
- correct_prediction_2 = tf.equal(tf.argmax(y_conv_2, 1), tf.argmax(y_2, 1))
- accuracy_2 = tf.reduce_mean(tf.cast(correct_prediction_2, "float"))
- print('%d:'%length,"check accuracy %g" % sess.run(accuracy_2))
rknn_predict.py
- import numpy as np
- from PIL import Image
- from rknn.api import RKNN
- import cv2
- from tensorflow.examples.tutorials.mnist import input_data
- import tensorflow as tf
-
- mnist = input_data.read_data_sets("../MNIST_data/", one_hot=True)
- print(mnist.test.images[0].shape)
- # 解析模型的输出,获得概率最大的手势和对应的概率
- def get_predict(probability):
- data = probability[0][0]
- data = data.tolist()
- max_prob = max(data)
- return data.index(max_prob), max_prob
- # return data.index(max_prob), max_prob;
- def load_model(model_name):
- # 创建RKNN对象
- rknn = RKNN()
- # 载入RKNN模型
- print('-->loading model')
- rknn.load_rknn(model_name)
- print('loading model done')
- # 初始化RKNN运行环境
- print('--> Init runtime environment')
- ret = rknn.init_runtime()
- if ret != 0:
- print('Init runtime environment failed')
- exit(ret)
- print('done')
- return rknn
- def predict(rknn,length):
- acc_count = 0
- for i in range(length):
- # im = mnist.test.images[i]
- im = Image.open("../MNIST_data/mnist_test/test_%d.jpg"%i) # 加载图片
- im = im.resize((28,28),Image.ANTIALIAS)
- im = np.asarray(im)
- outputs = rknn.inference(inputs=[im])
- pred, prob = get_predict(outputs)
- if i ==0:
- print(outputs)
- print(prob)
- print(pred)
- if i ==100 or i ==500 or i ==1000 or i ==10000:
- result = float(acc_count)/i
- print('result%d:'%i,result)
- if list(mnist.test.labels[i]).index(1) == pred:
- acc_count += 1
- result = float(acc_count)/length
- print('result:',result)
- # acc_count = 0
- # length = len(mnist.test.images)
- # for i in range(length):
- # im = mnist.test.images[i]# 加载图片
- # outputs = rknn.inference(inputs=[im]) # 运行推理,得到推理结果
- # pred, prob = get_predict(outputs) # 将推理结果转化为可视信息
- # if i%100 == 0:
- # print(prob)
- # print(pred)
- # print(acc_count)
- # print(list(mnist.test.labels[i]).index(1))
- # if list(mnist.test.labels[i]).index(1) == pred:
- # acc_count += 1
- # result = float(acc_count)/length
- # print('result:',result)
- if __name__=="__main__":
- #此处要改成相应的量化或者非量化rknn模型
- model_name = './mnist.rknn'
- length = 10000
- rknn = load_model(model_name)
- predict(rknn,length)
-
- rknn.release()
得到最终的结果对比图表如下:
|
|