来自: 三七二十四 ╮
回复量:3
创建时间: 2017-03-22 16:51
RDD默认持久方式是MEMORY_ONLY,无法通过persist进行修改持久方式,提示“Cannot change storage level of an RDD after it was already assigned a level”,如何设置系统或JOB的RDD持久方式?
0 赞
3 回复
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the <span class="wp_keywordlink_affiliate"><a href="#" title="" target="_blank" data-original-title="View all posts in Spark">Spark</a></span>Context for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this }
上面中if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
这段代码的最主要作用其实就是将storageLevel设置为persist()函数传进来的存储级别,而且一旦设置好RDD的存储级别之后就不能再对相同RDD设置别的存储级别,否则将会出现异常。
包括官网的例子2.0.2中NetworkWordCount.scala也无法顺利运行,删除类型或改为MEMORY_ONLY可顺利通过,我想知道MEMORY_ONLY是默认,设置过了就不允许修改,那么,何时可以修改这个默认呢?
ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK)
楼主尝试下面方法:例如
sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
693
mapreduce中 combiner 合并文件,默认是一次合并多少个文件啊?
726
请问 谁有课程 (推荐算法与Spark MLLIB) 里面的代码
940
730
947