Commit 226eb959 by guanchen

添加奖励类型

parent 08e831b6
package com.lanren.huhu.partner.config;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author houseme
* @date 2018/1/22 下午11:51
* File
*/
@Configuration
public class RedisConfiguration {
private static Logger logger = LoggerFactory.getLogger(RedisConfiguration.class);
static {
logger.info("加载redis配置");
ParserConfig.getGlobalInstance().addAccept("com.lanren.huhu.partner.model.");
ParserConfig.getGlobalInstance().addAccept("com.lanren.huhu.partner.result.");
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
}
@Bean
public RedisCacheManager getRedisCacheManager(RedisConnectionFactory connectionFactory) {
RedisCacheWriter cacheWriter = RedisCacheWriter.lockingRedisCacheWriter(connectionFactory);
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
RedisSerializationContext.SerializationPair<Object> pair = RedisSerializationContext.SerializationPair.fromSerializer(fastJsonRedisSerializer);
RedisCacheConfiguration cacheConfig = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(pair);
return new RedisCacheManager(cacheWriter, cacheConfig);
}
@Bean
@Scope("prototype")
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
//设置默认的Serialize,包含 keySerializer & valueSerializer
redisTemplate.setDefaultSerializer(fastJsonRedisSerializer);
//单独设置valueSerializer
redisTemplate.setValueSerializer(fastJsonRedisSerializer);
//单独设置keySerializer
redisTemplate.setKeySerializer(fastJsonRedisSerializer);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setStringSerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@Scope("prototype")
public RedisTemplate redisTemplatePush(@Value("${spring.redis.database2}") int pushDb,
@Value("${spring.redis.host}") String host,
@Value("${spring.redis.port}") int port,
@Value("${spring.redis.password}") String password) {
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(host);
configuration.setPort(port);
configuration.setDatabase(pushDb);
RedisPassword redisPassword = RedisPassword.of(password);
configuration.setPassword(redisPassword);
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();
builder.poolConfig(genericObjectPoolConfig);
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, builder.build());
connectionFactory.afterPropertiesSet();
return redisTemplate(connectionFactory);
}
}
package com.lanren.huhu.partner.config;
import com.lanren.huhu.partner.service.impl.RedissonLockService;
import com.lanren.huhu.partner.util.LockUtil;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* @author chen
* @title: RedissonConfig
* @projectName partner
* @description: 注入锁工具类
* @package com.lanren.huhu.partner.config
* @date 2019-07-01 19:41
*/
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
/**
* RedissonClient,单机模式
* @return
* @throws IOException
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
@Bean
public RedissonLockService redissonLocker(RedissonClient redissonClient){
RedissonLockService locker = new RedissonLockService(redissonClient);
//设置LockUtil的锁处理对象
try{
LockUtil.setLockService(locker);
} catch (Exception e) {
System.out.println("redissonLocker failed");
System.out.println(e.getMessage());
}
return locker;
}
}
...@@ -5,7 +5,6 @@ import com.lanren.huhu.partner.model.Agent; ...@@ -5,7 +5,6 @@ import com.lanren.huhu.partner.model.Agent;
import com.lanren.huhu.partner.model.AgentResponse; import com.lanren.huhu.partner.model.AgentResponse;
import com.lanren.huhu.partner.model.ParentAgent; import com.lanren.huhu.partner.model.ParentAgent;
import com.lanren.huhu.partner.result.Result; import com.lanren.huhu.partner.result.Result;
import com.lanren.huhu.partner.schedule.AgentDailyExpandTask;
import com.lanren.huhu.partner.schedule.AgentSettleTask; import com.lanren.huhu.partner.schedule.AgentSettleTask;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -28,8 +27,6 @@ public class AgentController { ...@@ -28,8 +27,6 @@ public class AgentController {
@Autowired @Autowired
AgentManager agentManager; AgentManager agentManager;
@Autowired @Autowired
AgentDailyExpandTask agentDailyExpandTask;
@Autowired
AgentSettleTask agentSettleTask; AgentSettleTask agentSettleTask;
@RequestMapping(value = "/level", method = RequestMethod.POST) @RequestMapping(value = "/level", method = RequestMethod.POST)
...@@ -61,14 +58,6 @@ public class AgentController { ...@@ -61,14 +58,6 @@ public class AgentController {
} }
return agentManager.getPingtuiParentList(agent.getAgentId()); return agentManager.getPingtuiParentList(agent.getAgentId());
} }
@RequestMapping(value = "/expand/{dat}", method = RequestMethod.GET)
public Result<String> updateOneDayExpand(@PathVariable("dat") String dat) {
agentDailyExpandTask.runSummary(dat);
logger.info("============>" + Thread.currentThread().getName());
Result<String> result = new Result<String>();
result.setData("异步,正在执行刷新......");
return result;
}
@RequestMapping(value = "/finance/{agentId}", method = RequestMethod.GET) @RequestMapping(value = "/finance/{agentId}", method = RequestMethod.GET)
public Result<String> doFinanceSettle(@PathVariable("agentId") String agentId, @RequestHeader HttpHeaders headers) { public Result<String> doFinanceSettle(@PathVariable("agentId") String agentId, @RequestHeader HttpHeaders headers) {
......
package com.lanren.huhu.partner.schedule;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.lanren.huhu.partner.domain.AgentExpandDetail;
import com.lanren.huhu.partner.domain.PartnerInviteRelation;
import com.lanren.huhu.partner.model.ParentAgent;
import com.lanren.huhu.partner.service.AgentExpandDailyService;
import com.lanren.huhu.partner.service.AgentExpandDetailService;
import com.lanren.huhu.partner.service.PartnerInviteRelationService;
import com.lanren.huhu.partner.service.UserService;
import com.lanren.huhu.partner.util.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import static com.lanren.huhu.partner.constants.Constants.PARENT_COLUMN_NAME_CENGJI;
/**
* @author chen
* @title: AgentDailyExpandTask
* @projectName partner
* @description: 代理商每日拓展用户数, 按用户的代理商层次查找归属
* @package com.lanren.huhu.partner.schedule
* @date 2019-07-08 15:50
*/
@Component
public class AgentDailyExpandTask {
private static Logger logger = LoggerFactory.getLogger(AgentDailyExpandTask.class);
private static final int PAGE_SIZE = 1000;
private static final long DELAY = 2 * 60 * 60 * 1000L;
@Autowired
PartnerInviteRelationService partnerInviteRelationService;
@Autowired
UserService userService;
@Autowired
AgentExpandDetailService agentExpandDetailService;
@Autowired
AgentExpandDailyService agentExpandDailyService;
// @Scheduled(fixedDelay = DELAY)
public void runScheduledTask() {
try {
Date dat = new Date(System.currentTimeMillis() - DELAY);
String datStr = DateUtils.format(dat, DateUtils.FORMAT_SHORT);
logger.info("run AgentDailyExpandTask {}", datStr);
runSummary(datStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
@Async
public void runSummary(String datStr) {
try {
Date dat = DateUtils.parse(datStr, DateUtils.FORMAT_SHORT);
int now = (int)(System.currentTimeMillis() / 1000L);
int pageNo = 1;
int total = 0;
int totalPage = 0;
int cnt = 0;
long beginTs = DateUtils.parse(datStr, DateUtils.FORMAT_SHORT).getTime() / 1000L;
long endTs = beginTs + 24 * 60 * 60L - 1L;
while (true) {
Page<PartnerInviteRelation> page = new Page<PartnerInviteRelation>(pageNo, PAGE_SIZE);
IPage<PartnerInviteRelation> ipage = partnerInviteRelationService.selectPageByTs(page, beginTs, endTs);
if (total == 0) {
total = (int) ipage.getTotal();
totalPage = total % PAGE_SIZE == 0 ? total / PAGE_SIZE : total / PAGE_SIZE + 1;
logger.info("totalPage is {} ", totalPage);
}
List<PartnerInviteRelation> partnerInviteRelationList = ipage.getRecords();
if (partnerInviteRelationList != null && partnerInviteRelationList.size() > 0) {
for (PartnerInviteRelation partnerInviteRelation : partnerInviteRelationList) {
List<ParentAgent> parentAgentList = userService.getAgentListByUserId(partnerInviteRelation.getUserId(), PARENT_COLUMN_NAME_CENGJI);
for (ParentAgent parentAgent : parentAgentList) {
AgentExpandDetail detail = new AgentExpandDetail();
detail.setDat(dat);
detail.setAgentId(parentAgent.getAgentId());
detail.setUserId(parentAgent.getUserId());
detail.setExpandUserId(partnerInviteRelation.getUserId());
detail.setExpandUserRegTime(new Date(partnerInviteRelation.getRegiterTime() * 1000L));
detail.setCreatedAt(now);
detail.setUpdatedAt(now);
agentExpandDetailService.insertOrUpdate(detail);
}
cnt++;
}
if (partnerInviteRelationList.size() > 0) {
agentExpandDailyService.updateByDatByDetail(datStr);
}
}
if (pageNo >= totalPage) {
break;
}
pageNo++;
}
logger.info("cnt is {}", cnt);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
package com.lanren.huhu.partner.schedule;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lanren.huhu.partner.domain.AgentReward;
import com.lanren.huhu.partner.domain.PartnerAccount;
import com.lanren.huhu.partner.domain.UserAgent;
import com.lanren.huhu.partner.model.AgentRewardMessage;
import com.lanren.huhu.partner.model.AgentRewardQueueMessage;
import com.lanren.huhu.partner.model.ParentAgent;
import com.lanren.huhu.partner.service.AgentRewardService;
import com.lanren.huhu.partner.service.PartnerAccountService;
import com.lanren.huhu.partner.service.UserAgentService;
import com.lanren.huhu.partner.service.UserService;
import com.lanren.huhu.partner.util.DateUtils;
import com.lanren.huhu.partner.util.LockUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.lanren.huhu.partner.constants.Constants.*;
/**
* @author chen
* @title: AgentRewardQueueTask
* @projectName partner
* @description: 消费 代理商奖励的消息队列, 处理代理商奖励, 写到agent_reward
* @package com.lanren.huhu.partner.schedule
* @date 2019-06-29 15:39
*/
@Component
public class AgentRewardQueueTask {
private static Logger logger = LoggerFactory.getLogger(AgentRewardQueueTask.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
UserAgentService userAgentService;
@Autowired
UserService userService;
@Autowired
AgentRewardService agentRewardService;
@Autowired
PartnerAccountService partnerAccountService;
@Autowired
RedisTemplate redisTemplatePush;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
// @Scheduled(fixedDelay = 5000L)
public void runScheduledTask() {
logger.info("run AgentRewardQueueTask");
runConsume();
}
private void runConsume() {
ListOperations<String, String> ops = stringRedisTemplate.opsForList();
while(null != ops && null != ops.size(AGENT_REWARD_QUEUE_KEY) && ops.size(AGENT_REWARD_QUEUE_KEY) > 0L) {
String msg = "";
try {
/**
* 加分布式锁
*/
LockUtil.lock(DISTRIBUTE_REDIS_LOCK_KEY);
try {
msg = ops.rightPop(AGENT_REWARD_QUEUE_KEY);
} catch (Exception e) {
logger.error(e.getMessage(), e);
continue;
} finally{
LockUtil.unlock(DISTRIBUTE_REDIS_LOCK_KEY);
}
logger.info("msg is {}", msg);
JSONObject json = JSON.parseObject(msg);
AgentRewardMessage message = json.toJavaObject(AgentRewardMessage.class);
processReward(message);
} catch (Exception e) {
logger.error("process agent reward message failed", msg);
logger.error(e.getMessage(), e);
}
}
}
@Async
public void processReward(AgentRewardMessage message) {
ListOperations<String, String> ops = stringRedisTemplate.opsForList();
logger.info("异步处理 agent reward: {}", message);
int rewardType = message.getRewardType();
Map<Integer, BigDecimal> rateMap;
if (!AGENT_RATE_MAP.keySet().contains(rewardType)) {
logger.info("未知的奖励类型 跳过不处理: {}", message);
return;
} else {
if ((rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER)
&& message.getOrderType().equals(ORDER_TYPE_JD) ) {
rateMap = AGENT_RATE_MAP.get(rewardType + JD_OFFSET);
} else if (rewardType == AGENT_REWARD_TYPE_SELF_ORDER
&& message.getOrderType().equals(ORDER_TYPE_MT) ) {
rateMap = AGENT_RATE_MAP.get(rewardType + MT_OFFSET);
} else {
rateMap = AGENT_RATE_MAP.get(rewardType);
}
}
UserAgent userAgent = userAgentService.getOneByAgentId(message.getAgentId());
if (userAgent == null) {
logger.info("代理商id: {} 不存在, 跳过不处理", message);
return;
}
// if (userAgent.getAgentLevel() != AGENT_LEVEL_4 &&
// (rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER || rewardType == AGENT_REWARD_TYPE_REDPACK) ) {
// logger.info("奖励类型{}, 代理商id: {} 不是城市代理, 跳过不处理", rewardType, message);
// return;
// }
/**
* 直属层级代理 是总代 没有平推, 是授权公司 只奖励自购省和分享赚
*/
if (userAgent.getAgentLevel() == AGENT_LEVEL_1 ||
(userAgent.getAgentLevel() == AGENT_LEVEL_2 && (rewardType != AGENT_REWARD_TYPE_SELF_ORDER && rewardType != AGENT_REWARD_TYPE_SHARE_ORDER))) {
logger.info("奖励类型{}, 代理商id: {} , 不在奖励范围内, 跳过不处理", rewardType, message);
return;
}
/**
* 这里的逻辑有问题, 不能用这个接口找上级, 因为会先找一次直接邀请人,
* 只能按message.getAgentId()的代理商id, 找user_agent表里的平推关系
* 已修复
*/
ArrayList<ParentAgent> parentList = (ArrayList<ParentAgent>) userService.getAgentListByUserId(userAgent.getUserId(), PARENT_COLUMN_NAME_PINGTUI);
if (parentList.size() > 0) {
/**
* 扫描agentlist中的平推城市代理
* 最多只需要找2个
*/
int rewardCnt = 0;
ArrayList<ParentAgent> rewardList = new ArrayList<ParentAgent>();
for (ParentAgent agent : parentList) {
/**
* 奖励次数不够2, 写到rewardList
*/
if (rewardCnt < 2){
rewardList.add(agent);
rewardCnt++;
/**
* 如果找到总代 就不再往上找了 都给总代
*/
if (agent.getLevel() == AGENT_LEVEL_1) {
break;
}
/**
* 如果是AGENT_LEVEL_2 奖励类型 750 或 760, 只奖励1个人 并且只拿1份
*/
if (userAgent.getAgentLevel() == AGENT_LEVEL_2 && (rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER)) {
break;
}
} else {
break;
}
}
if (rewardList.size() > 2) {
logger.error("代理商推荐奖励处理异常 奖励人数超出2人, 奖励消息: {}", message);
return;
}
/**
* 如果上面检查通过, 证明代理商关系没问题, 后面处理只看rewardList的size
* 决定比例怎么分: 1个人-拿2份; 2个人-各1份
*/
List<BigDecimal> rateList = new ArrayList<BigDecimal>();
if (rewardList.size() == 1) {
/**
* 如果直属层级代理商是AGENT_LEVEL_2 奖励类型 750 或 760 只拿1份
*/
if (userAgent.getAgentLevel() == AGENT_LEVEL_2 &&
(rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER) ) {
rateList.add(rateMap.get(0));
} else {
rateList.add(rateMap.get(0).add(rateMap.get(1)));
}
} else if (rewardList.size() == 2) {
if (userAgent.getAgentLevel() == AGENT_LEVEL_2 &&
(rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER) ) {
logger.error("代理商推荐奖励逻辑处理错误, 直属代理商id: {}, rewardList[1]: {}", userAgent.getAgentId(), rewardList.get(1));
}
rateList.add(rateMap.get(0));
rateList.add(rateMap.get(1));
}
TransactionStatus transactionStatus = null;
List<AgentReward> pushList = new ArrayList<AgentReward>();
// List<AgentReward> pushRewardList = new ArrayList<AgentReward>();
try {
/**
* 开启事务, 如果rewardList中的奖励 都插入成功, 才提交事务
* 并发送redis消息, 否则, 进行回滚, 并把读入的消息塞回队列
*/
transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
for (int i=0; i<rewardList.size(); i++) {
BigDecimal commissionRate = rateList.get(i);
AgentReward reward = doInsert(commissionRate, rewardList.get(i), message.clone());
if (reward != null) {
pushList.add(reward);
/**
* 如果是城市代理获得的佣金 或 红包奖励 需要 再往上分
* 已废弃
*/
// if (reward.getAgentLevel() == AGENT_LEVEL_4 &&
// (rewardType == AGENT_REWARD_TYPE_SELF_ORDER || rewardType == AGENT_REWARD_TYPE_SHARE_ORDER || rewardType == AGENT_REWARD_TYPE_REDPACK)
// ) {
// pushRewardList.add(reward);
// }
}
}
/**
* 提交事务
*/
dataSourceTransactionManager.commit(transactionStatus);
/**
* 数据库写入成功后 发推送
*/
for (AgentReward reward : pushList) {
doPush(reward);
}
// for (AgentReward reward : pushRewardList) {
// pushRewardQueue(reward, message.clone());
// }
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("插入代理商分成奖励失败, 奖励 {}", message);
/**
* 回滚事务
*/
if (transactionStatus != null) {
dataSourceTransactionManager.rollback(transactionStatus);
}
/**
* 消息处理失败 丢回队列
*/
ops.leftPush(AGENT_REWARD_QUEUE_KEY, JSON.toJSONString(message));
/**
* 休眠
*/
try {
Thread.sleep(5 * 1000L);
} catch (Exception e1) {
logger.error(e.getMessage(), e1);
}
}
}
}
// private void pushRewardQueue(AgentReward reward, AgentRewardMessage message) {
// logger.info("处理循环上供奖励.......");
// logger.info("reward is :{}", reward);
// logger.info("message is :{}", message);
// message.setAgentId(reward.getAgentId());
// message.setAgentReward(reward.getAmount());
// String rateArray;
// if (StringUtils.isEmpty(reward.getRatioAll())) {
// rateArray = "[" + reward.getCommissionRate() + "]";
// } else {
// rateArray = reward.getRatioAll().replaceAll("\\]", "");
// rateArray = rateArray + reward.getCommissionRate() + "]";
// }
// message.setOrderRateArray(rateArray);
// logger.info("推送循环上供奖励: {}", message);
// stringRedisTemplate.opsForList().leftPush(AGENT_REWARD_QUEUE_KEY, JSON.toJSONString(message));
// }
private AgentReward doInsert(BigDecimal commissionRate, ParentAgent agent, AgentRewardMessage message) {
try {
int rewardType = message.getRewardType();
int sourceUserLevel = partnerAccountService.getOneByUserId(message.getSourceUserId()).getPartnerLevel();
AgentReward agentReward = new AgentReward();
agentReward.setRewardType(message.getRewardType());
agentReward.setRechargeTime(DateUtils.parse(message.getRechargeTime(), DateUtils.FORMAT_LONG));
agentReward.setSourceUserId(message.getSourceUserId());
agentReward.setReferenceId(Long.parseLong(message.getReferenceId()));
agentReward.setAgentId(agent.getAgentId());
agentReward.setSettleState(message.getSettleState());
agentReward.setOrderType(message.getOrderType());
agentReward.setOrderSn(message.getOrderSn());
agentReward.setSubOrderSn(message.getSubOrderSn());
agentReward.setGoodsId(message.getOrderGoodsId());
agentReward.setTitle(message.getOrderTitle());
agentReward.setAgentLevel(agent.getLevel());
agentReward.setUserId(agent.getUserId());
agentReward.setRewardRemark(AGENT_REWARD_REMARK.get(message.getRewardType()));
if (StringUtils.isEmpty(message.getSourceOrder())) {
agentReward.setSourceOrder(message.getReferenceId());
} else {
agentReward.setSourceOrder(message.getSourceOrder());
}
agentReward.setSourceUserLevel(sourceUserLevel);
agentReward.setCreatedAt(System.currentTimeMillis() / 1000L);
agentReward.setCommissionRate(commissionRate);
agentReward.setRewardTypeChild(message.getRewardTypeChild() == null ? "" : message.getRewardTypeChild());
/**
* agent_reward里:
* money 面额(message里, 需要上供的代理商自己获得的奖励)
* commission_account 面额
* amount 奖励金额
* commission 奖励金额
* cash_code 支付金额
*/
BigDecimal rewardBasement;
// if (rewardType == AGENT_REWARD_TYPE_YEAR_VIP || rewardType == AGENT_REWARD_TYPE_HALF_YEAR_VIP || rewardType == AGENT_REWARD_TYPE_SVIP) {
if (rewardType == AGENT_REWARD_TYPE_YEAR_VIP || rewardType == AGENT_REWARD_TYPE_REDPACK) {
rewardBasement = message.getSourceUserPayment();
} else {
/**
* 订单相关, 还需要写入两个字段:all_money_ori 原始佣金, 继承下来的比例数组
*/
rewardBasement = message.getOrderCommission();
agentReward.setAllMoneyOri(message.getOrderCommission());
agentReward.setRatioAll("");
}
logger.info("rewardBasement is: {}", rewardBasement);
logger.info("commissionRate is: {}", commissionRate);
BigDecimal reward = rewardBasement.multiply(commissionRate);
/**
* 小于0.000001的奖励 不写入
*/
if (reward.compareTo(MIN_REWARD) < 0) {
logger.info("代理商({})平推奖励金额小于0.000001, 忽略不写入. msg:{}", agent.getAgentId(), message);
return null;
} else {
agentReward.setMoney(rewardBasement);
agentReward.setCommissionAcount(rewardBasement);
agentReward.setAmount(reward);
agentReward.setCommission(reward);
agentReward.setCashCode("0");
agentRewardService.save(agentReward);
return agentReward;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
private void doPush(AgentReward agentReward) {
try {
JSONObject json = new JSONObject();
PartnerAccount partnerAccount = partnerAccountService.getOneByUserId(agentReward.getUserId());
AgentRewardQueueMessage message = new AgentRewardQueueMessage();
message.setUserId(agentReward.getUserId());
message.setSourceUserId(agentReward.getSourceUserId());
message.setMoney(agentReward.getCommission().toString());
message.setPartnerLevel(partnerAccount.getIsSuperPartner() == 1 ? 30 : partnerAccount.getPartnerLevel());
message.setTime(agentReward.getCreatedAt());
message.setRewardType(agentReward.getRewardType());
redisTemplatePush.opsForList().leftPush(AGENT_REWARD_PUSH_KEY, message);
logger.info("推送奖励通知 {}", message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
package com.lanren.huhu.partner.schedule;
import com.lanren.huhu.partner.domain.AgentRevokeRecords;
import com.lanren.huhu.partner.domain.AgentReward;
import com.lanren.huhu.partner.service.AgentRevokeRecordsService;
import com.lanren.huhu.partner.service.AgentRewardService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import static com.lanren.huhu.partner.constants.Constants.*;
/**
* @author chen
* @title: RevokeAgentRewardTask
* @projectName partner
* @description: 扣回代理商的奖励
* @package com.lanren.huhu.partner.schedule
* @date 2019-07-04 11:41
*/
@Component
public class RevokeAgentRewardTask {
private static Logger logger = LoggerFactory.getLogger(RevokeAgentRewardTask.class);
@Autowired
AgentRevokeRecordsService agentRevokeRecordsService;
@Autowired
AgentRewardService agentRewardService;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
// @Scheduled(fixedDelay = 5000L)
public void runScheduledTask() {
logger.info("run RevokeAgentRewardTask");
runConsume();
}
private void runConsume() {
List<AgentRevokeRecords> revokeRecordList = null;
try {
revokeRecordList = agentRevokeRecordsService.getAllPending();
if (null == revokeRecordList || revokeRecordList.size() == 0) {
return;
}
process(revokeRecordList);
} catch (Exception e) {
logger.error("process revoke failed. {}", revokeRecordList);
logger.error(e.getMessage(), e);
}
}
@Async
public void process(List<AgentRevokeRecords> revokeRecordList) {
for (AgentRevokeRecords agentRevokeRecord : revokeRecordList) {
process(agentRevokeRecord);
}
}
private void process(AgentRevokeRecords revokeRecord) {
TransactionStatus transactionStatus = null;
try {
int revokeType = revokeRecord.getRevokeType();
if (!AGENT_REVOKE_TYPE_MAP.keySet().contains(revokeType)) {
logger.info("未知的奖励类型 跳过不处理: {}", revokeRecord);
return;
}
/**
* 开启事务
*/
transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
List<AgentReward> agentRewardList = null;
List<AgentReward> insertList = new ArrayList<AgentReward>();
List<AgentReward> updateList = new ArrayList<AgentReward>();
if (revokeType == AGENT_REVOKE_TYPE_ACTIVATION || revokeType == AGENT_REVOKE_TYPE_SELF_ORDER) {
agentRewardList = agentRewardService.getListByRefId(revokeRecord.getRefId());
} else if (revokeType == AGENT_REVOKE_TYPE_SHARE_ORDER) {
agentRewardList = agentRewardService.getListBySubOrderSn(revokeRecord.getOrderType(), revokeRecord.getOrderSn(), revokeRecord.getSubOrderSn());
} else {
revokeRecord.setState(REVOKE_STATE_DO_NOTHING);
}
/**
* 已结算状态 插负, 未结算状态 抹平
*/
if (null != agentRewardList && agentRewardList.size() > 0) {
for (AgentReward agentReward : agentRewardList) {
if (agentReward.getSettleState() == SETTLE_STATE_DONE) {
agentReward.setAmount(agentReward.getAmount().negate());
agentReward.setCommission(agentReward.getCommission().negate());
insertList.add(agentReward);
} else if (agentReward.getSettleState() == SETTLE_STATE_ON_THE_WAY || agentReward.getSettleState() == SETTLE_STATE_PRE) {
agentReward.setAmount(new BigDecimal(0));
agentReward.setCommission(new BigDecimal(0));
updateList.add(agentReward);
}
}
if (updateList.size() > 0) {
agentRewardService.updateBatch(updateList);
}
if (insertList.size() > 0) {
agentRewardService.batchInsert(insertList);
}
revokeRecord.setState(REVOKE_STATE_DONE);
} else {
revokeRecord.setState(REVOKE_STATE_DO_NOTHING);
}
agentRevokeRecordsService.updateById(revokeRecord);
/**
* 提交事务
*/
dataSourceTransactionManager.commit(transactionStatus);
} catch (Exception e) {
/**
* 回滚事务
*/
dataSourceTransactionManager.rollback(transactionStatus);
logger.error(e.getMessage(), e);
revokeRecord.setState(REVOKE_STATE_FAILED);
agentRevokeRecordsService.updateById(revokeRecord);
}
}
}
package com.lanren.huhu.partner.service;
import java.util.concurrent.TimeUnit;
/**
* @author chen
* @title: LockService
* @projectName partner
* @description: 锁接口
* @package com.lanren.huhu.partner.service
* @date 2019-07-01 19:37
*/
public interface LockService {
/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。
*
* @param lockKey
*/
void lock(String lockKey);
/**
* 释放锁
*
* @param lockKey
*/
void unlock(String lockKey);
/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。如果获取到锁后,执行结束后解锁或达到超时时间后会自动释放锁
*
* @param lockKey
* @param timeout
*/
void lock(String lockKey, int timeout);
/**
* 获取锁,如果锁不可用,则当前线程处于休眠状态,直到获得锁为止。如果获取到锁后,执行结束后解锁或达到超时时间后会自动释放锁
*
* @param lockKey
* @param unit
* @param timeout
*/
void lock(String lockKey, TimeUnit unit, int timeout);
/**
* 尝试获取锁,获取到立即返回true,未获取到立即返回false
*
* @param lockKey
* @return
*/
boolean tryLock(String lockKey);
/**
* 尝试获取锁,在等待时间内获取到锁则返回true,否则返回false,如果获取到锁,则要么执行完后程序释放锁,
* 要么在给定的超时时间leaseTime后释放锁
*
* @param lockKey
* @param waitTime
* @param leaseTime
* @param unit
* @return
*/
boolean tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit)
throws InterruptedException;
/**
* 锁是否被任意一个线程锁持有
*
* @param lockKey
* @return
*/
boolean isLocked(String lockKey);
}
package com.lanren.huhu.partner.service.impl;
import com.lanren.huhu.partner.service.LockService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
/**
* @author chen
* @title: RedissonLockService
* @projectName partner
* @description: 基于redisson的锁实现
* @package com.lanren.huhu.partner.service.impl
* @date 2019-07-01 19:38
*/
public class RedissonLockService implements LockService {
private RedissonClient redissonClient;
public RedissonLockService(RedissonClient redissonClient) {
super();
this.redissonClient = redissonClient;
}
@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
@Override
public void lock(String lockKey, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, TimeUnit.SECONDS);
}
@Override
public void lock(String lockKey, TimeUnit unit, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
}
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@Override
public boolean tryLock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock();
}
@Override
public boolean tryLock(String lockKey, long waitTime, long leaseTime,
TimeUnit unit) throws InterruptedException{
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock(waitTime, leaseTime, unit);
}
@Override
public boolean isLocked(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
return lock.isLocked();
}
}
package com.lanren.huhu.partner.util;
import com.lanren.huhu.partner.service.LockService;
import java.util.concurrent.TimeUnit;
/**
* @author chen
* @title: LockUtil
* @projectName partner
* @description: 分布式锁工具类
* @package com.lanren.huhu.partner.util
* @date 2019-07-01 19:39
*/
public class LockUtil {
private static LockService lockService;
/**
* 设置工具类使用的locker
* @param lockService
*/
public static void setLockService(LockService lockService) {
LockUtil.lockService = lockService;
}
/**
* 获取锁
* @param lockKey
*/
public static void lock(String lockKey) {
try {
lockService.lock(lockKey);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 释放锁
* @param lockKey
*/
public static void unlock(String lockKey) {
lockService.unlock(lockKey);
}
/**
* 获取锁,超时释放
* @param lockKey
* @param timeout
*/
public static void lock(String lockKey, int timeout) {
lockService.lock(lockKey, timeout);
}
/**
* 获取锁,超时释放,指定时间单位
* @param lockKey
* @param unit
* @param timeout
*/
public static void lock(String lockKey, TimeUnit unit, int timeout) {
lockService.lock(lockKey, unit, timeout);
}
/**
* 尝试获取锁,获取到立即返回true,获取失败立即返回false
* @param lockKey
* @return
*/
public static boolean tryLock(String lockKey) {
return lockService.tryLock(lockKey);
}
/**
* 尝试获取锁,在给定的waitTime时间内尝试,获取到返回true,获取失败返回false,获取到后再给定的leaseTime时间超时释放
* @param lockKey
* @param waitTime
* @param leaseTime
* @param unit
* @return
* @throws InterruptedException
*/
public static boolean tryLock(String lockKey, long waitTime, long leaseTime,
TimeUnit unit) throws InterruptedException {
return lockService.tryLock(lockKey, waitTime, leaseTime, unit);
}
/**
* 锁释放被任意一个线程持有
* @param lockKey
* @return
*/
public static boolean isLocked(String lockKey) {
return lockService.isLocked(lockKey);
}
}
...@@ -51,31 +51,6 @@ spring: ...@@ -51,31 +51,6 @@ spring:
active: @profiles.active@ active: @profiles.active@
jmx: jmx:
default-domain: @project.artifactId@ default-domain: @project.artifactId@
## Redis 配置
## Redis数据库索引(默认为0)
redis:
database: 13
database2: 5
## Redis测试服务器地址
host: @redis.host@
## Redis测试服务器连接端口
port: 6379
## Redis测试服务器连接密码(默认为空)
password: @redis.password@
## 连接池最大连接数(使用负值表示没有限制)
lettuce:
pool:
max-active: 8
## 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
## 连接池中的最大空闲连接
max-idle: 8
## 连接池中的最小空闲连接
min-idle: 0
shutdown-timeout: 100ms
## 连接超时时间(毫秒)
timeout: 5000ms
http: http:
encoding: encoding:
charset: UTF-8 charset: UTF-8
......
...@@ -791,7 +791,7 @@ ...@@ -791,7 +791,7 @@
SUM(case when recharge_time &lt; '2019-08-16 12:03:50' and agent_level=4 and reward_type in (60,760,300,750) then amount * 0.06 else 0 end) techChargeOrder, SUM(case when recharge_time &lt; '2019-08-16 12:03:50' and agent_level=4 and reward_type in (60,760,300,750) then amount * 0.06 else 0 end) techChargeOrder,
SUM(case when reward_type in (40) then amount else 0 end) opencardBalance, SUM(case when reward_type in (40) then amount else 0 end) opencardBalance,
SUM(case when reward_type in (50,740) then amount else 0 end) rechargeBalance, SUM(case when reward_type in (50,740) then amount else 0 end) rechargeBalance,
SUM(case when reward_type in (30,710,720,210,80,230 ,780) then amount else 0 end) upgradeBalance, SUM(case when reward_type in (30,710,720,210,80,230 ,780,290,790) then amount else 0 end) upgradeBalance,
SUM(case when reward_type in (20,730) then amount else 0 end) upgradeSuperBalance, SUM(case when reward_type in (20,730) then amount else 0 end) upgradeSuperBalance,
SUM(case when reward_type in (60,760) then amount else 0 end) shareBalance, SUM(case when reward_type in (60,760) then amount else 0 end) shareBalance,
SUM(case when reward_type in (300,750) then amount else 0 end) zigoushengBalance, SUM(case when reward_type in (300,750) then amount else 0 end) zigoushengBalance,
...@@ -802,8 +802,8 @@ ...@@ -802,8 +802,8 @@
COUNT(reward_type IN (50, 740) OR NULL) AS onlinerechargeNum, COUNT(reward_type IN (50, 740) OR NULL) AS onlinerechargeNum,
SUM(if(reward_type=40, amount, 0)) AS open_card_cash, SUM(if(reward_type=40, amount, 0)) AS open_card_cash,
COUNT(reward_type=40 OR NULL) AS open_card_num, COUNT(reward_type=40 OR NULL) AS open_card_num,
SUM(if(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780),amount, 0)) AS upgrade_partner_cash, SUM(if(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780,290,790),amount, 0)) AS upgrade_partner_cash,
COUNT(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780) OR NULL) AS upgrade_partner_num, COUNT(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780,290,790) OR NULL) AS upgrade_partner_num,
SUM(if(reward_type IN (50, 30, 20),cash_code, 0)) AS marketPerformance, SUM(if(reward_type IN (50, 30, 20),cash_code, 0)) AS marketPerformance,
SUM(if(reward_type IN (30, 20),cash_code, 0)) AS upgrade_market_cash, SUM(if(reward_type IN (30, 20),cash_code, 0)) AS upgrade_market_cash,
SUM(if(reward_type=50, cash_code, 0)) AS recharge_market_cash SUM(if(reward_type=50, cash_code, 0)) AS recharge_market_cash
...@@ -854,7 +854,7 @@ ...@@ -854,7 +854,7 @@
SUM(case when recharge_time &lt; '2019-08-16 12:03:50' and agent_level=4 and reward_type in (60,760,300,750) then amount * 0.06 else 0 end) techChargeOrder, SUM(case when recharge_time &lt; '2019-08-16 12:03:50' and agent_level=4 and reward_type in (60,760,300,750) then amount * 0.06 else 0 end) techChargeOrder,
SUM(case when reward_type in (40) then amount else 0 end) opencardBalance, SUM(case when reward_type in (40) then amount else 0 end) opencardBalance,
SUM(case when reward_type in (50,740) then amount else 0 end) rechargeBalance, SUM(case when reward_type in (50,740) then amount else 0 end) rechargeBalance,
SUM(case when reward_type in (30,710,720,210, 80, 230 ,780) then amount else 0 end) upgradeBalance, SUM(case when reward_type in (30,710,720,210, 80, 230 ,780,290,790) then amount else 0 end) upgradeBalance,
SUM(case when reward_type in (20,730) then amount else 0 end) upgradeSuperBalance, SUM(case when reward_type in (20,730) then amount else 0 end) upgradeSuperBalance,
SUM(case when reward_type in (60,760) then amount else 0 end) shareBalance, SUM(case when reward_type in (60,760) then amount else 0 end) shareBalance,
SUM(case when reward_type in (300,750) then amount else 0 end) zigoushengBalance, SUM(case when reward_type in (300,750) then amount else 0 end) zigoushengBalance,
...@@ -865,8 +865,8 @@ ...@@ -865,8 +865,8 @@
COUNT(reward_type IN (50, 740) OR NULL) AS onlinerechargeNum, COUNT(reward_type IN (50, 740) OR NULL) AS onlinerechargeNum,
SUM(if(reward_type=40, amount, 0)) AS open_card_cash, SUM(if(reward_type=40, amount, 0)) AS open_card_cash,
COUNT(reward_type=40 OR NULL) AS open_card_num, COUNT(reward_type=40 OR NULL) AS open_card_num,
SUM(if(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780),amount, 0)) AS upgrade_partner_cash, SUM(if(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780,290,790),amount, 0)) AS upgrade_partner_cash,
COUNT(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780) OR NULL) AS upgrade_partner_num, COUNT(reward_type IN (30, 20, 710, 720, 730, 210, 80, 230 ,780,290,790) OR NULL) AS upgrade_partner_num,
SUM(if(reward_type IN (50, 30, 20),cash_code, 0)) AS marketPerformance, SUM(if(reward_type IN (50, 30, 20),cash_code, 0)) AS marketPerformance,
SUM(if(reward_type IN (30, 20),cash_code, 0)) AS upgrade_market_cash, SUM(if(reward_type IN (30, 20),cash_code, 0)) AS upgrade_market_cash,
SUM(if(reward_type=50, cash_code, 0)) AS recharge_market_cash SUM(if(reward_type=50, cash_code, 0)) AS recharge_market_cash
......
...@@ -874,8 +874,8 @@ ...@@ -874,8 +874,8 @@
0 bank_tax, 0 bank_tax,
SUM(if(reward_type=60 AND is_show_on_client=1, commission_acount, 0)) AS cash, SUM(if(reward_type=60 AND is_show_on_client=1, commission_acount, 0)) AS cash,
COUNT(reward_type=60 OR NULL) AS num, COUNT(reward_type=60 OR NULL) AS num,
SUM(if(reward_type IN (70,210,220,320,230, 100) AND is_show_on_client=1,commission_acount, 0)) AS upCash, SUM(if(reward_type IN (70,210,220,320,330,230, 100) AND is_show_on_client=1,commission_acount, 0)) AS upCash,
COUNT(reward_type IN (70,210,220,320,230, 100)OR NULL) AS upNum, COUNT(reward_type IN (70,210,220,320,330,230, 100)OR NULL) AS upNum,
SUM(if(reward_type=80 AND is_show_on_client=1, commission_acount, 0)) AS icash, SUM(if(reward_type=80 AND is_show_on_client=1, commission_acount, 0)) AS icash,
COUNT(reward_type=80 OR NULL) AS inum, COUNT(reward_type=80 OR NULL) AS inum,
SUM(if(reward_type=90 AND is_show_on_client=1, commission_acount, 0)) AS pcash, SUM(if(reward_type=90 AND is_show_on_client=1, commission_acount, 0)) AS pcash,
...@@ -916,8 +916,8 @@ ...@@ -916,8 +916,8 @@
0 bank_tax, 0 bank_tax,
SUM(if(reward_type=60 AND is_show_on_client=1, commission_acount, 0)) AS cash, SUM(if(reward_type=60 AND is_show_on_client=1, commission_acount, 0)) AS cash,
COUNT(reward_type=60 OR NULL) AS num, COUNT(reward_type=60 OR NULL) AS num,
SUM(if(reward_type IN (70,210,220,320,230, 100) AND is_show_on_client=1,commission_acount, 0)) AS upCash, SUM(if(reward_type IN (70,210,220,320,330,230, 100) AND is_show_on_client=1,commission_acount, 0)) AS upCash,
COUNT(reward_type IN (70,210,220,320,230, 100)OR NULL) AS upNum, COUNT(reward_type IN (70,210,220,320,330,230, 100)OR NULL) AS upNum,
SUM(if(reward_type=80 AND is_show_on_client=1, commission_acount, 0)) AS icash, SUM(if(reward_type=80 AND is_show_on_client=1, commission_acount, 0)) AS icash,
COUNT(reward_type=80 OR NULL) AS inum, COUNT(reward_type=80 OR NULL) AS inum,
SUM(if(reward_type=90 AND is_show_on_client=1, commission_acount, 0)) AS pcash, SUM(if(reward_type=90 AND is_show_on_client=1, commission_acount, 0)) AS pcash,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment