C#使用SqlBulkCopy向Sqlserver数据库批量更新插入数据的例子
C#使用SqlBulkCopy向Sqlserver数据库大批量更新插入数据数据库的例子
目前负责开发维护的一个项目是一个24小时被动收取文件数据的项目,上一代版本我觉得逻辑比较复杂(本人比较水有些代码理解不了,而且上一代开发人员跑路了0.0)就自己独立拆分开发了一套
SqlBulkCopy也是第一次接触,主要是为了解决项目的数据库批量更新插入问题,项目每日数据量一天大概几十万数据(这是排除了重复发送的文件。。。,目标客户是医院,搞过的都懂)这里就碰到了数据库写入的问题,按收一个文件写一条数据库的逻辑运行一段时间的观察结果就是医院工作时间内程序就一直访问数据库 几十万单条写入可想而知对数据库压力有多大(其他程序还要读取操作),同时还在向硬盘写入文件,就时不时出现磁盘满载或者数据库cpu过高问题,加之服务器也不是这个项目专属的,只好想办法优化程序数据库批量写入逻辑。
以下是一个使用SqlBulkCopy 大批量更新插入数据库数据的例子 (这里针对项目实体做了反射封装,以下代码项目已经稳定运行半年有余)
实体基类方法
/// <summary>
/// 基础公共属性操作类
/// </summary>
public class BaseMethod
{
public BaseMethod()
{
Id = GenerateGuid();
CreateTime = DateTime.Now;
}
#region 公共部分
public const String C_Id = "Id";
public const String C_CreateTime = "CreateTime";
/// <summary>
/// 表主键 GUID
/// </summary>
[ExplicitKey]
public Guid Id { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; }
#endregion
/// <summary>
/// 创建有序GUID
/// </summary>
/// <returns></returns>
protected static Guid GenerateGuid()
{
byte[] guidArray = Guid.NewGuid().ToByteArray();
var baseDate = new DateTime(1900, 1, 1);
DateTime now = DateTime.Now;
var days = new TimeSpan(now.Ticks - baseDate.Ticks);
TimeSpan msecs = now.TimeOfDay;
byte[] daysArray = BitConverter.GetBytes(days.Days);
byte[] msecsArray = BitConverter.GetBytes((long)(msecs.TotalMilliseconds / 3.333333));
Array.Reverse(daysArray);
Array.Reverse(msecsArray);
Array.Copy(daysArray, daysArray.Length - 2, guidArray, guidArray.Length - 6, 2);
Array.Copy(msecsArray, msecsArray.Length - 4, guidArray, guidArray.Length - 4, 4);
return new Guid(guidArray);
}
private static List<string> PublicFields
{
get
{
Type baseClass = typeof(BaseMethod);
List<string> fields = baseClass.GetFields().Where(e => e.IsPublic & e.Name.StartsWith("C_"))
.Select(e => e.GetValue(null).ToString()).ToList();
return fields;
}
}
/// <summary>
/// 获取数据表字段 优化查询性能 避免 SELECT *
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static List<string> GetTableFields<T>() where T : BaseMethod
{
List<string> TableFields = new List<string>();
TableFields.AddRange(PublicFields);
Type type = typeof(T).BaseType;
List<string> fields = type.GetFields().Where(e => e.IsPublic & e.Name.StartsWith("C_"))
.Select(e => e.GetValue(null).ToString()).ToList();
TableFields.AddRange(fields);
return TableFields;
}
}
泛型实体调用SqlBulkCopy
/// <summary>
/// 任务队列 负责业务分发
/// </summary>
public class TaskQueue<T> where T: BaseMethod
{
/// <summary>
/// SqlBulkCopy
/// </summary>
DataTable SqlBulkCopyTable = new DataTable();
/// <summary>
/// 泛型类
/// </summary>
Type BaseType = typeof(T);
/// <summary>
/// 字段
/// </summary>
static List<string> TableFields = BaseMethod.GetTableFields<T>();
/// <summary>
/// 字符串
/// </summary>
string DbConn;
public TaskQueue(string DbConn)
{
this.DbConn = DbConn;
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <param name="DcmInfoQueue"></param>
/// <returns></returns>
public bool BatchInsert(List<T> NewQueue)
{
try
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(DbConn, SqlBulkCopyOptions.FireTriggers))
{
//本地化的datatable字段对象名称
var TableFields = BaseMethod.GetTableFields<T>();
#region 动态创建Table载体
///映射表名称
bulkCopy.DestinationTableName = BaseMethod.GetTableName<T>();
//DateTable列名映射对应数据列名
Type ValueType;
//创建列并对应列的值类型
foreach (string column in TableFields)
{
ValueType = BaseType.GetProperty(column).PropertyType;
try
{
if (ValueType.IsGenericType && ValueType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
ValueType = ValueType.GetGenericArguments()[0];
}
SqlBulkCopyTable.Columns.Add(column, ValueType);
}
catch (Exception ex) {
}
bulkCopy.ColumnMappings.Add(column, column);
}
#endregion
//插入数据行
for (int i = 0; i < NewQueue.Count; i++)
{
//新数据行
DataRow row = SqlBulkCopyTable.NewRow();
///数据行
T info = NewQueue[i];
//逐个插入行字段
foreach (string column in TableFields)
{
try
{
var pi = BaseType.GetProperty(column);
object value = pi.GetValue(info);
if (value is null)
{
continue;
}
else
{
row[column] = value;
}
}
catch (Exception ex)
{
}
}
SqlBulkCopyTable.Rows.Add(row);
}
//批量插入数据库
bulkCopy.WriteToServer(SqlBulkCopyTable);
}
return true;
}
catch (Exception ex)
{
//ILogger.Error("批量操作失败", ex);
return false;
}
}
}
1.首先创建一个数据类来测试(需要对应数据库字段)
/// <summary>
/// 数据库扩展信息
/// </summary>
public class DcmInfoExtend : BaseMethod
{
public const String TableName = "DcmInfo";
///数据库字段信息
public const String C_LocalPath = "LocalPath";
public const String C_IsDelete = "IsDelete";
public const String C_StartDate = "StartDate";
public const String C_EndDate = "EndDate";
/// <summary>
/// 全部的查询字段
/// </summary>
public static string SelectFields
{
get
{
List<string> TableFields = GetTableFields<DcmInfo>();
return string.Join(",", TableFields);
}
}
}
/// <summary>
/// dcm单个文件入库表
/// </summary>
[Table(TableName)]
public class DcmInfo : DcmInfoExtend
{
public DcmInfo()
{
Id = GenerateGuid();
}
/// <summary>
/// 本地文件路径
/// </summary>
public string LocalPath { get; set; }
/// <summary>
/// 文件删除状态
/// </summary>
public bool IsDelete { get; set; }
/// <summary>
/// 开始时间
/// </summary>
public DateTime StartDate { get; set; }
/// <summary>
/// 接收完成时间
/// </summary>
public DateTime EndDate { get; set; }
}
批量插入调用例子
///创建批量操作任务
List<DcmInfo> DcmFileQueue=new List<DcmInfo>();
//自行Add添加Dcminfo数据
DcmFileQueue.Add(new DcmInfo());
DcmFileQueue.Add(new DcmInfo());
....以此类推
TaskQueue<DcmInfo> taskQueue = new BusinessDistribution.TaskQueue<DcmInfo>(DbConn);
bool IsSuccess = taskQueue.BatchInsert(DcmFileQueue);//执行批量新增操作